Source code for k1lib.cli.utils

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for all short and random quality-of-life utilities."""
from k1lib.cli.init import patchDefaultDelim, BaseCli, yieldT
import k1lib.cli as cli, k1lib.cli.init as init, numbers, numpy as np, dis
from k1lib.cli.typehint import *
from typing import overload, Iterator, Any, List, Set, Union, Callable
import k1lib, time, math, os, json, dill
from collections import defaultdict
try: import torch; hasTorch = True
except: torch = k1lib.Object().withAutoDeclare(lambda: type("RandomClass", (object, ), {})); hasTorch = False
try: import PIL; hasPIL = True
except: hasPIL = False
plt = k1lib.dep.plt
try: import genpy, rosbag; hasRos1 = True
except: hasRos1 = False
try: import pandas as pd; pd.core; hasPandas = True
except: hasPandas = False
__all__ = ["size", "shape", "item", "rItem", "iden", "join", "wrapList",
           "equals", "reverse", "ignore", "rateLimit", "timeLimit", "tab", "indent",
           "clipboard", "deref", "bindec", "smooth", "disassemble",
           "tree", "lookup", "lookupRange", "getitems", "backup", "sketch", "syncStepper", "zeroes", "normalize", "branch"]
settings = k1lib.settings.cli
def exploreSize(it):                                                             # exploreSize
    """Returns first element and length of array. Returns [first item, length]""" # exploreSize
    if isinstance(it, str): return None, len(it)                                 # exploreSize
    try: return it[0], len(it)                                                   # exploreSize
    except: pass                                                                 # exploreSize
    sentinel = object(); it = iter(it)                                           # exploreSize
    o = next(it, sentinel); count = 1                                            # exploreSize
    if o is sentinel: return None, 0                                             # exploreSize
    try:                                                                         # exploreSize
        while True: next(it); count += 1                                         # exploreSize
    except StopIteration: pass                                                   # exploreSize
    return o, count                                                              # exploreSize
[docs]class size(BaseCli): # size
[docs] def __init__(self, idx=None): # size """Returns number of rows and columns in the input. Example:: # returns (3, 2) [[2, 3], [4, 5, 6], [3]] | shape() # returns 3 [[2, 3], [4, 5, 6], [3]] | shape(0) # returns 2 [[2, 3], [4, 5, 6], [3]] | shape(1) # returns (2, 0) [[], [2, 3]] | shape() # returns (3,) [2, 3, 5] | shape() # returns 3 [2, 3, 5] | shape(0) # returns (3, 2, 2) [[[2, 1], [0, 6, 7]], 3, 5] | shape() # returns (1, 3) ["abc"] | shape() # returns (1, 2, 3) [torch.randn(2, 3)] | shape() # returns (2, 3, 5) shape()(np.random.randn(2, 3, 5)) :class:`shape` is an alias of this cli. Use whichever is more intuitive for you. :param idx: if not specified, returns a tuple of ints. If specified, then returns the specific index of the tuple""" # size super().__init__(); self.idx = idx; # size if idx is not None: self._f = cli.item(idx) # size
def _all_array_opt(self, it, level): # size res = np.array(it.shape[level:])[tuple([None]*level)] + np.zeros(it.shape[:level], dtype=int)[(*[slice(None)]*level, None)] # size return res if self.idx is None else res | cli.rItem(self.idx).all(level) # size def _typehint(self, inp): # size if self.idx is not None: return int # size return tList(int) # size
[docs] def __ror__(self, it:Iterator[str]): # size idx = self.idx # size if idx == 0: # super quick path for the really common case # size try: return len(it) # size except: # size try: return exploreSize(it)[1] # size except: pass # size if hasPIL and isinstance(it, PIL.Image.Image): return it.size if idx is None else it.size[idx] # size if hasPandas and isinstance(it, pd.core.frame.DataFrame): s = (len(it), it.size//len(it)); return s if idx is None else s[idx] # size if idx is None: # size answer = [] # size try: # size while True: # size if isinstance(it, settings.arrayTypes): # size return tuple(answer + list(it.shape)) # size it, s = exploreSize(it); answer.append(s) # size except TypeError: pass # size return tuple(answer) # size return exploreSize(it | self._f)[1] # size
def _jsF(self, meta): # size fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # size post = "" if self.idx is None else f"[{cli.kjs.v(self.idx)}]" # size return f"{fIdx} = ({dataIdx}) => {dataIdx}.shape(){post}", fIdx # size
shape = size # size noFill = object() # size
[docs]class item(BaseCli): # item
[docs] def __init__(self, amt:int=1, fill=noFill): # item """Returns the first element of the input iterator. Example:: # returns 0 range(5) | item() # returns torch.Size([5]) torch.randn(3,4,5) | item(2) | shape() # returns 3 [] | item(fill=3) :param amt: how many times do you want to call item() back to back? :param fill: if iterator length is 0, return this""" # item self.amt = amt; self.fill = fill # item self.fillP = [fill] if fill != noFill else [] # preprocessed, to be faster # item if self.amt != 1: self._f = cli.serial(*(item(fill=self.fill) for _ in range(self.amt))) # item
def _all_array_opt(self, it, level): return it[(*[slice(None, None, None) for i in range(level)], 0)] # item def _typehint(self, inp): # item if isinstance(inp, tListIterSet): return inp.child # item if isinstance(inp, tCollection): return inp.children[0] # item if isinstance(inp, tArrayTypes): # item if inp.rank is None: return inp.__class__(inp.child, None) # item if inp.rank - self.amt >= 1: return inp.__class__(inp.child, inp.rank-self.amt) # item return inp.child # item return tAny() # item
[docs] def __ror__(self, it:Iterator[str]): # item if self.amt != 1: return it | self._f # item if isinstance(it, settings.arrayTypes): return it[0] # item if hasPandas and isinstance(it, pd.DataFrame): return it[:1].to_numpy()[0] # item return next(iter(init.dfGuard(it)), *self.fillP) # item
def _jsF(self, meta): # item fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); _slice = "".join(["[0]"]*self.amt) # item return f"{fIdx} = ({dataIdx}) => {dataIdx}{_slice}", fIdx # item
[docs]class rItem(BaseCli): # rItem
[docs] def __init__(self, idx:int): # rItem """Combines ``rows(idx) | item()``, as this is a pretty common pattern. Example:: iter(range(10)) | rItem(4) # returns 4 """ # rItem self.idx = idx; self.arrayTypes = (*settings.arrayTypes, list, tuple) # rItem
def _all_array_opt(self, it, level:int): return it[(*[slice(None, None, None) for i in range(level)], self.idx)] # rItem
[docs] def __ror__(self, it): # rItem idx = self.idx # rItem if isinstance(it, self.arrayTypes): return it[idx] # rItem if hasPandas and isinstance(it, pd.DataFrame): return it[idx:idx+1].to_numpy()[0] # rItem for i, e in zip(range(self.idx+1), it): pass # rItem return e # rItem
def _jsF(self, meta): # rItem fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # rItem return f"{fIdx} = ({dataIdx}) => {dataIdx}[{cli.kjs.v(self.idx)}]", fIdx # rItem
[docs]class iden(BaseCli): # iden
[docs] def __init__(self): # iden """Yields whatever the input is. Useful for multiple streams. Example:: # returns range(5) range(5) | iden()""" # iden super().__init__() # iden
def _all_array_opt(self, it, level): return it # iden def _typehint(self, inp): return inp # iden
[docs] def __ror__(self, it:Iterator[Any]): return it # iden
def _jsF(self, meta): # iden fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # iden return f"{fIdx} = ({dataIdx}) => {dataIdx}", fIdx # iden
[docs]class join(BaseCli): # join
[docs] def __init__(self, delim:str=None): # join r"""Merges all strings into 1, with `delim` in the middle. Basically :meth:`str.join`. Example:: # returns '2\na' [2, "a"] | join("\n")""" # join super().__init__(); self.delim = patchDefaultDelim(delim) # join
def _typehint(self, inp): return str # join
[docs] def __ror__(self, it:Iterator[str]): # join return self.delim.join(init.dfGuard(it) | cli.apply(str)) # join
def _jsF(self, meta): # join fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # join return f"{fIdx} = ({dataIdx}) => {dataIdx}.join({json.dumps(self.delim)})", fIdx # join
[docs]class wrapList(BaseCli): # wrapList
[docs] def __init__(self): # wrapList """Wraps inputs inside a list. There's a more advanced cli tool built from this, which is :meth:`~k1lib.cli.structural.unsqueeze`. Example:: # returns [5] 5 | wrapList()""" # wrapList super().__init__() # wrapList
def _all_array_opt(self, it, level): return it[(*[slice(None)]*level, None)] # wrapList def _typehint(self, inp): return tList(inp) # wrapList
[docs] def __ror__(self, it) -> List[Any]: # wrapList if isinstance(it, settings.arrayTypes): return it[None] # wrapList return [it] # wrapList
def _jsF(self, meta): # wrapList fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # wrapList return f"{fIdx} = ({dataIdx}) => [{dataIdx}]", fIdx # wrapList
class _EarlyExp(Exception): pass # _EarlyExp
[docs]class equals: # equals
[docs] def __init__(self): # equals """Checks if all incoming columns/streams are identical""" # equals super().__init__() # equals
[docs] def __ror__(self, streams:Iterator[Iterator[str]]): # equals streams = list(streams) # equals for row in zip(*streams): # equals sampleElem = row[0] # equals try: # equals for elem in row: # equals if sampleElem != elem: yield False; raise _EarlyExp() # equals yield True # equals except _EarlyExp: pass # equals
[docs]class reverse(BaseCli): # reverse
[docs] def __init__(self): # reverse """Reverses incoming list. Example:: # returns [3, 5, 2] [2, 5, 3] | reverse() | deref()""" # reverse super().__init__() # reverse
def _all_array_opt(self, it, level): return it[(*[slice(None)]*level, slice(None, None, -1))] # reverse def _typehint(self, inp): # reverse if isinstance(inp, tListIterSet): return tIter(inp.child) # reverse return tAny() # reverse
[docs] def __ror__(self, it:Iterator[str]) -> List[str]: # reverse if isinstance(it, settings.arrayTypes): return it[::-1] # reverse if hasPandas and isinstance(it, pd.core.arraylike.OpsMixin): return it[::-1] # reverse return reversed(list(it)) # reverse
def _jsF(self, meta): # reverse fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # reverse return f"{fIdx} = ({dataIdx}) => [...{dataIdx}].reverse()", fIdx # reverse
[docs]class ignore(BaseCli): # ignore
[docs] def __init__(self): # ignore r"""Just loops through everything, ignoring the output. Example:: # will just return an iterator, and not print anything [2, 3] | apply(lambda x: print(x)) # will prints "2\n3" [2, 3] | apply(lambda x: print(x)) | ignore()""" # ignore super().__init__() # ignore
def _all_array_opt(self, it, level): return it # ignore def _typehint(self, inp): return type(None) # ignore
[docs] def __ror__(self, it:Iterator[Any]): # ignore if isinstance(it, settings.arrayTypes): return # ignore if hasPandas and isinstance(it, pd.core.arraylike.OpsMixin): return # ignore for _ in it: pass # ignore
def _jsF(self, meta): # ignore fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # ignore return f"{fIdx} = ({dataIdx}) => {dataIdx}", fIdx # ignore
[docs]class rateLimit(BaseCli): # rateLimit
[docs] def __init__(self, f, delay=0.1): # rateLimit """Limits the execution flow rate upon a condition. Example:: s = 0; semaphore = 0 def heavyAsyncOperation(i): global semaphore, s semaphore += 1 s += i; time.sleep(1) semaphore -= 1; return i**2 # returns (20,), takes 1s to run range(20) | applyTh(heavyAsyncOperation, 100) | shape() # returns (20,), takes 4s to run (20/5 = 4) range(20) | rateLimit(lambda: semaphore < 5) | applyTh(heavyAsyncOperation, 100) | shape() The first test case is not rate-limited, so it will run all 20 threads at the same time, and all of them will finish after 1 second. The second test case is rate-limited, so that there can only be 5 concurrently executing threads because of the semaphore count check. Therefore this takes around 4 seconds to run. :param f: checking function. Should return true if execution is allowed :param delay: delay in seconds between calling ``f()``""" # rateLimit self.f = f; self.delay = delay # rateLimit
def _typehint(self, inp): # rateLimit if isinstance(inp, tListIterSet): return tIter(inp.child) # rateLimit if isinstance(inp, tArrayTypes): # rateLimit if inp.rank is None: return tIter(inp) # rateLimit if inp.rank >= 2: return tIter(inp.__class__(inp.child, inp.rank - 1)) # rateLimit return tIter(inp.child) # rateLimit if isinstance(inp, tCollection): return inp # rateLimit return tAny() # rateLimit
[docs] def __ror__(self, it): # rateLimit f = self.f; delay = self.delay # rateLimit for e in init.dfGuard(it): # rateLimit while not f(): time.sleep(delay) # rateLimit yield e # rateLimit
[docs] @staticmethod # rateLimit def cpu(maxUtilization=90): # rateLimit """Limits flow rate when cpu utilization is more than a specified percentage amount. Needs to install the package ``psutil`` to actually work. Example:: # returns [0, 1, 4, 9, 16] range(5) | rateLimit.cpu() | apply(op()**2) | deref()""" # rateLimit import psutil # rateLimit return rateLimit(lambda: psutil.cpu_percent() < maxUtilization) # rateLimit
[docs]class timeLimit(BaseCli): # timeLimit
[docs] def __init__(self, t): # timeLimit """Caps the flow after a specified amount of time has passed. Example:: # returns 20, or roughly close to that repeatF(lambda: time.sleep(0.1)) | timeLimit(2) | shape(0)""" # timeLimit self.t = t # timeLimit
def _typehint(self, inp): # timeLimit if isinstance(inp, tListIterSet): return tIter(inp.child) # timeLimit if isinstance(inp, tArrayTypes): # timeLimit if inp.rank is None: return tIter(inp) # timeLimit if inp.rank >= 2: return tIter(inp.__class__(inp.child, inp.rank - 1)) # timeLimit return tIter(inp.child) # timeLimit if isinstance(inp, tCollection): return inp # timeLimit return tAny() # timeLimit
[docs] def __ror__(self, it): # timeLimit _time = time.time; endTime = _time() + self.t # timeLimit for e in init.dfGuard(it): # timeLimit yield e # timeLimit if _time() > endTime: break # timeLimit
[docs]def tab(pad:str=" "*4): # tab """Indents incoming string iterator. Example:: # prints out indented 0 to 9 range(10) | tab() | headOut()""" # tab return cli.apply(lambda x: f"{pad}{x}") # tab
indent = tab # tab
[docs]class clipboard(BaseCli): # clipboard
[docs] def __init__(self): # clipboard """Saves the input to clipboard. Example:: # copies "abc" into the clipboard. Just use Ctrl+V to paste as usual "abc" | clipboard()""" # clipboard import pyperclip; self.pyperclip = pyperclip # clipboard
def _typehint(self, inp): return type(None) # clipboard
[docs] def __ror__(self, s): self.pyperclip.copy(s) # clipboard
a = [numbers.Number, np.number, str, bool, bytes, k1lib.UValue, cli.conv.Audio] # clipboard if hasTorch: a.append(torch.nn.Module) # clipboard if hasRos1: a.append(rosbag.bag.BagMessage) # clipboard if hasPandas: a.append(pd.core.arraylike.OpsMixin) # clipboard settings.atomic.add("deref", tuple(a), "used by deref") # clipboard Tensor = torch.Tensor; atomic = settings.atomic # clipboard class inv_dereference(BaseCli): # inv_dereference def __init__(self, igT=False): # inv_dereference """Kinda the inverse to :class:`dereference`""" # inv_dereference super().__init__(); self.igT = igT # inv_dereference def __ror__(self, it:Iterator[Any]) -> List[Any]: # inv_dereference for e in it: # inv_dereference if e is None or isinstance(e, atomic.deref): yield e # inv_dereference elif isinstance(e, settings.arrayTypes): # inv_dereference if not self.igT and len(e.shape) == 0: yield e.item() # inv_dereference else: yield e # inv_dereference else: # inv_dereference try: yield e | self # inv_dereference except: yield e # inv_dereference _rosmsg_tempfile = [None]; _rosmsg_autoInc = k1lib.AutoIncrement() # inv_dereference def rosmsg2BagMessage(msg): # kinda abandoned. Turns out you can't pickle a BagMessage cleanly afterall. I kinda have to do it the long way. If you want to be able to serialize a message, just do `obj | deref()`, it will wrap around using RosMsg(), which is serializable # rosmsg2BagMessage if _rosmsg_tempfile[0] is None: _rosmsg_tempfile[0] = b"" | cli.file() # rosmsg2BagMessage fn = f"{_rosmsg_tempfile[0]}_{os.getpid()}_{_rosmsg_autoInc()}" # rosmsg2BagMessage with rosbag.Bag(fn, "w") as bag: bag.write("/default", msg) # rosmsg2BagMessage res = rosbag.Bag(fn, "r").read_messages() | cli.item() # rosmsg2BagMessage os.remove(fn); return res # rosmsg2BagMessage _rosmsg_tempfile2 = [None]; _rosmsg_autoInc2 = k1lib.AutoIncrement() # rosmsg2BagMessage def _rosmsg_getFn2(): # _rosmsg_getFn2 if _rosmsg_tempfile2[0] is None: _rosmsg_tempfile2[0] = b"" | cli.file(); os.remove(_rosmsg_tempfile2[0]) # _rosmsg_getFn2 return f"{_rosmsg_tempfile2[0]}_{os.getpid()}_{_rosmsg_autoInc2()}" # _rosmsg_getFn2 class RosMsg: # RosMsg def __init__(self, msg): self._ab_sentinel = True; self.__msg = msg; self._ab_sentinel = False # RosMsg def __getattr__(self, attr): # RosMsg if attr == "__msg": return self.__msg # RosMsg return getattr(self.__msg, attr) # RosMsg def __getstate__(self): # RosMsg fn = _rosmsg_getFn2() # RosMsg with rosbag.Bag(fn, "w") as bag: bag.write("/default", self.__msg) # RosMsg with open(fn, "rb") as f: raw = f.read() # RosMsg os.remove(fn); return {"raw": raw} # RosMsg def __setstate__(self, d): # RosMsg fn = _rosmsg_getFn2() # RosMsg with open(fn, "wb") as f: f.write(d["raw"]) # RosMsg with rosbag.Bag(fn) as bag: self.__msg = next(bag.read_messages()).message # RosMsg os.remove(fn) # RosMsg def __repr__(self): return self.__msg.__repr__() # RosMsg _rosMsgArrayTypes = k1lib.settings.cli.arrayTypes # RosMsg class RosMsgPlaceholder: # RosMsgPlaceholder def __init__(self, idx): self.idx = idx # RosMsgPlaceholder def _rosmsg_complex_deref_replace(it, autoInc, msgs): # _rosmsg_complex_deref_replace if isinstance(it, np.number): return it.item() # _rosmsg_complex_deref_replace elif isinstance(it, k1lib.settings.cli.atomic.deref): return it # _rosmsg_complex_deref_replace elif isinstance(it, _rosMsgArrayTypes): return it # _rosmsg_complex_deref_replace elif isinstance(it, dict): _d = {k: _rosmsg_complex_deref_replace(v, autoInc, msgs) for k, v in it.items()}; return _d # _rosmsg_complex_deref_replace elif isinstance(it, tuple): _t = tuple(_rosmsg_complex_deref_replace(k, autoInc, msgs) for k in it); return _t # _rosmsg_complex_deref_replace elif isinstance(it, set): _s = set (_rosmsg_complex_deref_replace(k, autoInc, msgs) for k in it); return _s # _rosmsg_complex_deref_replace elif isinstance(it, genpy.message.Message): idx = autoInc(); msgs[idx] = it; return RosMsgPlaceholder(idx) # _rosmsg_complex_deref_replace elif isinstance(it, RosMsg): idx = autoInc(); msgs[idx] = it.__msg; return RosMsgPlaceholder(idx) # _rosmsg_complex_deref_replace try: iter(it) # _rosmsg_complex_deref_replace except: return it # _rosmsg_complex_deref_replace answer = [] # _rosmsg_complex_deref_replace for e in it: # _rosmsg_complex_deref_replace if e is cli.yieldT: return answer # _rosmsg_complex_deref_replace answer.append(_rosmsg_complex_deref_replace(e, autoInc, msgs)) # _rosmsg_complex_deref_replace return answer # _rosmsg_complex_deref_replace def _rosmsg_complex_deref_reconstruct(it, msgs): # _rosmsg_complex_deref_reconstruct if isinstance(it, np.number): return it.item() # _rosmsg_complex_deref_reconstruct elif isinstance(it, k1lib.settings.cli.atomic.deref): return it # _rosmsg_complex_deref_reconstruct elif isinstance(it, _rosMsgArrayTypes): return it # _rosmsg_complex_deref_reconstruct elif isinstance(it, dict): _d = {k: _rosmsg_complex_deref_reconstruct(v, msgs) for k, v in it.items()}; return _d # _rosmsg_complex_deref_reconstruct elif isinstance(it, tuple): _t = tuple(_rosmsg_complex_deref_reconstruct(k, msgs) for k in it); return _t # _rosmsg_complex_deref_reconstruct elif isinstance(it, set): _s = set (_rosmsg_complex_deref_reconstruct(k, msgs) for k in it); return _s # _rosmsg_complex_deref_reconstruct elif isinstance(it, RosMsgPlaceholder): return msgs[it.idx] # _rosmsg_complex_deref_reconstruct try: iter(it) # _rosmsg_complex_deref_reconstruct except: return it # _rosmsg_complex_deref_reconstruct answer = [] # _rosmsg_complex_deref_reconstruct for e in it: # _rosmsg_complex_deref_reconstruct if e is cli.yieldT: return answer # _rosmsg_complex_deref_reconstruct answer.append(_rosmsg_complex_deref_reconstruct(e, msgs)) # _rosmsg_complex_deref_reconstruct return answer # _rosmsg_complex_deref_reconstruct class RosMsgComplex: # RosMsgComplex def __init__(self, data): # RosMsgComplex """An attempt to speed up serialization of ROS messages. Normally, I'd do this:: [msg1, msg2, ...] | deref() | aS(dill.dumps) | file("...") But this is a little inefficient as the process of writing to and reading from a temp bag file is not that fast. So this kinda bunches up all messages, write them into a single bag file, and have clever mechanism to reconstruct the structure. Turns out lots of messages can bog down the system. This does reduce load time by 2 times and disk size by 3 times. So it's effective, but just not wildly effective. This is not exposed automatically on the docs cause I don't feel like it's fast enough to justify that, but I couldn't just delete this.""" # RosMsgComplex self.data = data # RosMsgComplex def __getstate__(self): # RosMsgComplex fn = _rosmsg_getFn2() # RosMsgComplex with rosbag.Bag(fn, "w") as bag: # RosMsgComplex msgs = {}; struct = _rosmsg_complex_deref_replace(self.data, k1lib.AutoIncrement(prefix="/_rosmsg_"), msgs) # RosMsgComplex for k, v in msgs.items(): bag.write(k, v) # RosMsgComplex with open(fn, "rb") as f: raw = f.read() # RosMsgComplex res = {"struct": dill.dumps(struct), "raw": raw}; os.remove(fn); return res # RosMsgComplex def __setstate__(self, d): # RosMsgComplex fn = _rosmsg_getFn2() # RosMsgComplex with open(fn, "wb") as f: f.write(d["raw"]) # RosMsgComplex msgs = {x.topic:x for x in rosbag.Bag(fn).read_messages()} # RosMsgComplex self.data = _rosmsg_complex_deref_reconstruct(d["struct"], msgs); os.remove(fn) # RosMsgComplex
[docs]class deref(BaseCli): # deref
[docs] def __init__(self, maxDepth=float("inf"), igT=True): # deref """Recursively converts any iterator into a list. Example:: iter(range(5)) # returns something like "<range_iterator at 0x7fa8c52ca870>" iter(range(5)) | deref() # returns [0, 1, 2, 3, 4] [2, 3, yieldT, 6] | deref() # returns [2, 3], yieldT stops things early You can also specify a ``maxDepth``:: iter([range(3)]) | deref(0) # returns something like "<list_iterator at 0x7f810cf0fdc0>" iter([range(3)]) | deref(1) # returns [range(3)] iter([range(3)]) | deref(2) # returns [[0, 1, 2]] There are a few classes/types that are considered atomic, and :class:`deref` will never try to iterate over it. If you wish to change it, do something like:: settings.cli.atomic.deref = (int, float, ...) :param maxDepth: maximum depth to dereference. Starts at 0 for not doing anything at all :param igT: short for "ignore tensor". If True, then don't loop over :class:`torch.Tensor` and :class:`numpy.ndarray` internals""" # deref super().__init__(); self.igT = igT # deref self.maxDepth = maxDepth; self.depth = 0 # deref if hasTorch: self.arrayType = (torch.Tensor, np.ndarray) if k1lib.settings.startup.or_patch.numpy else torch.Tensor # deref else: self.arrayType = (np.ndarray,) if k1lib.settings.startup.or_patch.numpy else () # deref
def _typehint(self, inp, depth=float("inf")): # deref if depth == 0: return inp # deref if depth == float("inf"): depth = self.maxDepth # deref if isinstance(inp, type) and issubclass(inp, atomic.deref): return inp # deref if isinstance(inp, tArrayTypes): # deref if self.igT: return inp # deref if inp.rank is None: return tList(tAny()) # deref if inp.rank == 1: # deref if isinstance(inp, tTensor): # deref return tList(type(torch.tensor(3, dtype=inp.child).item())) # deref if isinstance(inp, tNpArray): # deref return tList(type(np.array(3, dtype=inp.child).item())) # deref return tList(self._typehint(inp.item(), depth-1)) # deref if isinstance(inp, tListIterSet): # deref return tList(self._typehint(inp.child, depth-1)) # deref if isinstance(inp, tCollection): # deref return tCollection(*(self._typehint(e, depth-1) for e in inp.children)) # deref return tAny() # deref
[docs] def __ror__(self, it:Iterator[Any]) -> List[Any]: # deref if self.depth >= self.maxDepth: return it # deref elif isinstance(it, np.number): return it.item() # deref elif isinstance(it, atomic.deref): return it # deref elif isinstance(it, self.arrayType): # deref if self.igT: return it # deref if len(it.shape) == 0: return it.item() # deref elif isinstance(it, dict): self.depth += 1; _d = {k: self.__ror__(v) for k, v in it.items()}; self.depth -= 1; return _d # deref elif isinstance(it, tuple): self.depth += 1; _t = tuple(self.__ror__(k) for k in it); self.depth -= 1; return _t # deref elif isinstance(it, set): self.depth += 1; _s = set (self.__ror__(k) for k in it); self.depth -= 1; return _s # deref elif hasRos1 and isinstance(it, genpy.message.Message): return RosMsg(it) # return rosmsg2BagMessage(it) # deref try: iter(it) # deref except: return it # deref self.depth += 1; answer = [] # deref for e in it: # deref if e is cli.yieldT: return answer # deref answer.append(self.__ror__(e)) # deref self.depth -= 1; return answer # deref
[docs] def __invert__(self) -> BaseCli: # deref """Returns a :class:`~k1lib.cli.init.BaseCli` that makes everything an iterator. Not entirely sure when this comes in handy, but it's there.""" # deref return inv_dereference(self.igT) # deref
def _jsF(self, meta): # deref fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # deref return f"{fIdx} = ({dataIdx}) => {dataIdx}", fIdx # deref
[docs] @staticmethod # deref def js(): # deref """Deref incoming object and turn them into a js object (NOT json string!). Example:: # returns "[...Array(10).keys()]" range(10) | deref.json() How does it know to transpile it? Based on the dictionary at `settings.cli.kjs.jsonF` and the object's "._jsonF" function. Say you have a custom list object, you can do something like this:: class CustomList: def __init__(self): ... def _jsonF(self): return "your js string here" Or, you can do something like this:: class CustomList: ... settings.cli.kjs.jsonF[CustomList] = lambda obj: "your js string here" A variety of data types are included out of the box already for common types, view the source code of this method to check them out.""" # deref jsonF = settings.kjs.jsonF # deref jsonF[list] = lambda x: "[" + ", ".join([deref_js(e) for e in x]) + "]" # deref jsonF[str] = lambda x: json.dumps(x) # deref jsonF[tuple] = jsonF[list]; jsonF[set] = lambda x: "new Set(" + jsonF[list](x) + ")" # deref jsonF[type(None)] = lambda x: "null" # deref jsonF[np.ndarray] = lambda x: json.dumps(x | deref(igT=False)) # deref if hasTorch: jsonF[torch.Tensor] = lambda x: json.dumps(x | deref(igT=False)) # deref jsonF[type(iter(range(10)))] = lambda x: "[" + ", ".join([str(e) for e in x]) + "]" # deref jsonF[type((x for x in range(0)))] = jsonF[list] # deref jsonF[type({}.keys())] = jsonF[list]; jsonF[type({}.values())] = jsonF[list] # deref jsonF[dict] = lambda x: "{" + ", ".join([f"{json.dumps(k)}: {deref_js(v)}" for k,v in x.items()]) + "}" # deref jsonF[defaultdict] = jsonF[dict] # deref deref.js = lambda: cli.aS(deref_js); return deref.js() # initializes at runtime, then patches deref.json() to get a faster path! # deref
def deref_js(obj): # deref_js # only 2 special cases, perf considerations, everything else is pluggable # deref_js if isinstance(obj, bool): return "true" if obj else "false" # deref_js if isinstance(obj, (numbers.Number, np.number)): return str(obj) # deref_js fn = settings.kjs.jsonF.get(type(obj), None) # deref_js if fn: return fn(obj) # deref_js if hasattr(obj, "_jsonF"): return obj._jsonF() # deref_js raise Exception(f"Don't know how to transcribe object with class {type(obj)}. Either add the serialization function to `settings.cli.kjs.jsonF`, or implement the function `._jsonF()` to your custom class") # deref_js
[docs]class bindec(BaseCli): # bindec
[docs] def __init__(self, cats:List[Any], f=None): # bindec """Binary decodes the input. Example:: # returns ['a', 'c'] 5 | bindec("abcdef") # returns 'a,c' 5 | bindec("abcdef", join(",")) :param cats: categories :param f: transformation function of the selected elements. Defaulted to :class:`~k1lib.cli.conv.toList`, but others like :class:`join` is useful too""" # bindec self.cats = cats; self.f = f or cli.toList() # bindec
[docs] def __ror__(self, it): # bindec it = bin(int(it))[2:][::-1] # bindec return (e for i, e in zip(it, self.cats) if i == '1') | self.f # bindec
settings.add("smooth", 10, "default smooth amount, used in utils.smooth") # bindec
[docs]class smooth(BaseCli): # smooth
[docs] def __init__(self, consecutives=None, windowing=False): # smooth """Smoothes out the input stream. Literally just a shortcut for:: batched(consecutives) | toMean().all() Example:: # returns [4.5, 14.5, 24.5] range(30) | smooth(10) | deref() Smoothing over :class:`torch.Tensor` or :class:`numpy.ndarray` will be much faster:: # returns torch.Tensor with shape (2) torch.randn(10, 3, 4) | smooth(4) The default consecutive value is in ``settings.cli.smooth``. This is useful if you are smoothing over multiple lists at the same time, like this:: # can change a single smooth value temporarily here, and all sequences will be smoothed in the same way with settings.cli.context(smooth=5): x = list(np.linspace(-2, 2, 50)) y = x | apply(op()**2) | deref() plt.plot(x | smooth() | deref(), y | smooth() | deref()) :param consecutives: if not defined, then used the value inside ``settings.cli.smooth``""" # smooth n = consecutives or settings.smooth; self.b = cli.window(n) if windowing else cli.batched(n) # smooth self.consecutives = consecutives; self.windowing = windowing # smooth
def _all_array_opt(self, it, level): return it | (self.b | cli.toMean().all()).all(level) # smooth
[docs] def __ror__(self, it): return init.dfGuard(it) | self.b | cli.toMean().all() # smooth
def _jsF(self, meta): # smooth if self.windowing: raise Exception(f"._jsF() does not support windowing in smooth() yet") # smooth fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # smooth return f"{fIdx} = ({dataIdx}) => {dataIdx}.smooth({cli.kjs.v(self.consecutives)})", fIdx # smooth
def _f(): pass # _f _code = type(_f.__code__) # _f
[docs]def disassemble(f=None): # disassemble """Disassembles anything piped into it. Normal usage:: def f(a, b): return a**2 + b # both of these print out disassembled info f | disassemble() disassemble(f) # you can pass in lambdas disassemble(lambda x: x + 3) # or even raw code "lambda x: x + 3" | disassemble()""" # disassemble c = f # disassemble if c is None: return cli.aS(disassemble) # disassemble if isinstance(c, str): c = compile(c, "", "exec") # disassemble try: c = c.__code__ # disassemble except: pass # disassemble if not isinstance(c, _code): raise RuntimeError(f"`{c}` is not a code object/function/class method/string code") # disassemble print(f"co_argcount: {c.co_argcount}") # disassemble print(f"co_cellvars: {c.co_cellvars}") # disassemble print(f"co_consts: {c.co_consts}") # disassemble print(f"co_filename: {c.co_filename}") # disassemble print(f"co_firstlineno: {c.co_firstlineno}") # disassemble print(f"co_flags: {c.co_flags}") # disassemble print(f"co_freevars: {c.co_freevars}") # disassemble print(f"co_kwonlyargcount: {c.co_kwonlyargcount}") # disassemble print(f"co_lnotab: {c.co_lnotab | cli.apply(str) | join(' ')}") # disassemble print(f"co_name: {c.co_name}") # disassemble print(f"co_names: {c.co_names}") # disassemble print(f"co_nlocals: {c.co_nlocals}") # disassemble print(f"co_posonlyargcount: {c.co_posonlyargcount}") # disassemble print(f"co_stacksize: {c.co_stacksize}") # disassemble print(f"co_varnames: {c.co_varnames}") # disassemble print(f"Disassembly:"); dis.disassemble(c) # disassemble with k1lib.captureStdout() as out: # disassemble c.co_consts | cli.filt(lambda x: "code" in str(type(x))) | cli.tee(lambda _: "----------------------- inner code object -----------------------\n") | cli.apply(disassemble) | cli.ignore() # disassemble out() | cli.filt(cli.op().strip() != "") | cli.apply("|" + cli.op()) | cli.indent() | cli.stdout() # disassemble
shortName = lambda s: s.split(os.sep)[-1] # disassemble
[docs]def tree(fL=10, dL=10, depth=float("inf"), ff:Callable[[str], bool]=(lambda s: True), df:Callable[[str], bool]=(lambda s: True)): # tree """Recursively gets all files and folders. Output format might be a bit strange, so this is mainly for visualization. Example:: "." | tree() | deref() This is way less powerful and structured than clis from the module :mod:`k1lib.cli.ktree`. Check that out. This cli is mainly for backwards compability. :param fL: max number of file per directory included in output :param dL: max number of child directories per directory included in output :param depth: explore depth :param ff: optional file filter function :param df: optional directory filter function""" # tree processFolders = cli.apply(lambda x: [shortName(x), x]) | cli.apply(lambda x: x | tree(fL, dL, depth-1, ff, df) if depth > 0 else [], 1) | cli.toDict() # tree a = cli.filt(os.path.isfile) | cli.filt(ff) | cli.head(fL) | cli.apply(shortName) | cli.aS(set) # tree b = ~cli.filt(os.path.isfile) | cli.filt(df) | cli.head(dL) | processFolders # tree return cli.ls() | ~cli.sortF(os.path.isfile) | (a & b) # tree
[docs]class lookup(BaseCli): # lookup
[docs] def __init__(self, d:dict, col:int=None, fill=None, mode:str="error"): # lookup """Looks up items from a dictionary/object. Example:: d = {"a": 3, "b": 5, "c": 52} "abcca" | lookup(d) | deref() # returns [3, 5, 52, 52, 3] "abccad" | lookup(d) | deref() # raises Exception, as key "d" does not exist "abccad" | lookup(d, fill="(not found)") | deref() # returns [3, 5, 52, 52, 3, '(not found)'], mode automatically switched to "fill" "abccad" | lookup(d, mode="fill") | deref() # returns [3, 5, 52, 52, 3, None]. Do this when you really want to return None "abccad" | lookup(d, fill=input) | deref() # returns [3, 5, 52, 52, 3, 'd'], mode automatically switched to "input" "abccad" | lookup(d, mode="input") | deref() # returns [3, 5, 52, 52, 3, 'd'], similar to above "abccad" | lookup(d, mode="rm") | deref() # returns [3, 5, 52, 52, 3], removing the unknown element [range(5), "abcca"] | transpose() | lookup(d, 1) | deref() # returns [[0, 3], [1, 5], [2, 52], [3, 52], [4, 3]] The ``mode`` param needs a little explaning. It specifies what should happen when an element is not found within the given dictionary. There are 3 modes total: - error: if ``.fill`` is not None, then throws an error. If ``.fill`` is specified, then this acts like mode "fill" instead - input: returns whatever the input element is - rm: removes (aka ignore) the element - fill: returns the arg ``.fill`` :param d: any object that can be sliced with the inputs :param col: if None, lookup on each row, else lookup a specific column only :param fill: fill value for elements that are not in the provided dictionary. Explained more above :param mode: "error", "input", "rm", "fill". Explained more above""" # lookup self.d = d; self.col = col; self.fill = fill # lookup if mode == "error": # override .mode so that it's backwards compatible # lookup if fill is input: mode = "input"; fill = None # lookup elif fill is not None: mode = "fill" # lookup self.mode = mode; self.rmSentinel = rmSentinel = object() # lookup if mode == "error": f = lambda e: d[e] # lookup elif mode == "input": f = lambda e: d.get(e, e) # lookup elif mode == "rm": f = lambda e: d.get(e, rmSentinel) # lookup elif mode == "fill": f = lambda e: d.get(e, fill) # lookup else: raise Exception("Invalid mode. Only 'error', 'input', 'rm' and 'fill' are allowed") # lookup self.f = f # lookup def fa(it, col): # lookup if mode == "rm": return it | cli.apply(lambda e: d.get(e, rmSentinel), col) | cli.filt(lambda x: x is not rmSentinel, col) # lookup return it | cli.apply(f, col) # lookup self.fa = fa # lookup
def _typehint(self, inp): # lookup t = inferType(list(self.d.values())) # lookup if isinstance(t, tListIterSet): return tIter(t.child) # lookup if isinstance(t, tCollection): return tIter(tLowest(*t.children)) # lookup return tIter(tAny()) # lookup
[docs] def __ror__(self, it): # lookup col = self.col # lookup if hasPandas and isinstance(it, pd.DataFrame): # lookup if col is None: it = init.dfGuard(it) # lookup else: # lookup f = self.f; rmSentinel = self.rmSentinel; c = [f(e) for e in it[list(it)[col]]]; it = it.replaceCol(list(it)[col], c) # lookup return it.iloc[[i for i, e in enumerate(c) if e is not rmSentinel]] if self.mode == "rm" else it # lookup # return pd.DataFrame({getattr(c, "name", ogName if i == col else next(genName)):c for i,c in enumerate(cols)}) # lookup return self.fa(it, col) # lookup
def _jsF(self, meta): # lookup if self.mode not in ("input", "rm", "fill"): raise Exception(f"lookup()._jsF() only supports modes 'input', 'rm' and 'fill'. Either specify a mode, or a default fill value") # lookup fIdx = init._jsFAuto(); dictIdx = f"{init._jsDAuto()}_{round(time.time())}"; dataIdx = init._jsDAuto() # lookup return f"//k1_moveOutStart\n{dictIdx} = {json.dumps(self.d)}; //k1_moveOutEnd\n{fIdx} = ({dataIdx}) => {dataIdx}.lookup({dictIdx}, {cli.kjs.v(self.col)}, {cli.kjs.v(self.fill)}, `{self.mode}`)", fIdx # lookup
_sorted = sorted # lookup
[docs]class lookupRange(BaseCli): # lookupRange
[docs] def __init__(self, ranges, col:int=None, sorted=True, fill=None, mode="error"): # lookupRange """Looks up values within some range. Example:: ranges = [[2, 3, "a"], [4, 5, "b"], [6, 7, "c"]] vs = [1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 5.5] vs | lookupRange(ranges, mode="error") | deref() # raises an exception cause it can't find "1" in any ranges vs | lookupRange(ranges, mode="fill") | deref() # returns [None, None, 'a', 'a', None, None, 'b', 'b', None, None] vs | lookupRange(ranges, mode="rm") | deref() # returns ['a', 'a', 'b', 'b'] vs | lookupRange(ranges, mode="input") | deref() # returns [1, 1.5, 'a', 'a', 3, 3.5, 'b', 'b', 5, 5.5] vs = list(zip([1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 5.5], "abcdefghij")) vs | lookupRange(ranges, 0, mode="rm") | deref() # returns [['a', 'c'], ['a', 'd'], ['b', 'g'], ['b', 'h']] So, ``ranges`` should be a table with 3 columns: start, stop and value. This cli will search across all ranges, and if the input iterator has values within a single range, it will yield that range's value. The exact comparison expression is "start <= input < stop". Internally, there're 2 implementations: First implementation assumes the ranges are not overlapping, activated by "sorted=True". This will assume the ranges are sorted based on the start values, then it searches for the value using binary search. Time complexity is O(n*log(m)), where n is the input size, m is the ranges's length Second implementation doesn't assume the ranges are not overlapping, activated by "sorted=False". This won't sort the ranges, and searches for the value using linear search, yielding the first range that contains the value. Time complexity is O(n*m) See also: :class:`lookup` :param ranges: table of size (N, 3), with each row (start, stop, value) :param col: column to act upon :param sorted: if True, use binary search, else use linear search. Explained more above :param fill: if specified, and if no ranges contain the value, then yield this value instead :param mode: explained above. See :class:`lookup` as well""" # lookupRange try: ranges[:]; len(ranges) # lookupRange except: ranges = ranges | deref(2) # lookupRange if mode == "error" and fill is not None: mode = "fill" # lookupRange if mode == "error" and fill == input: mode = "input" # lookupRange self.ranges = ranges; self.col = col; self.sorted = sorted; self.fill = fill; self.mode = mode # lookupRange if mode not in ("error", "rm", "fill", "input"): raise Exception(f".mode can only be 'error', 'rm', 'fill' or 'input'") # lookupRange
[docs] def __ror__(self, it): # lookupRange ranges = self.ranges; col = self.col; fill = self.fill; mode = self.mode; sentinel = object(); it = init.dfGuard(it) # lookupRange colIsNone = col is None; modeFill = mode == "fill"; modeRmOrError = mode == "rm" or mode == "error"; modeInput = mode == "input"; modeError = mode == "error" # lookupRange def edit(row, value): row = list(row); row[col] = value; return row # lookupRange if self.sorted: # lookupRange for row in it: # lookupRange v = row if col is None else row[col] # lookupRange start = 0; end = len(ranges)-1; e = sentinel # lookupRange while start <= end: # lookupRange mid = round((start + end)/2) # lookupRange r = ranges[mid] # lookupRange if r[0] <= v < r[1]: e = r[2]; break # lookupRange if v < r[0]: end = mid-1 # lookupRange else: start = mid+1 # lookupRange if colIsNone: # lookupRange if modeFill: yield fill if e is sentinel else e # lookupRange elif modeRmOrError and e is not sentinel: yield e # lookupRange elif modeInput: yield v if e is sentinel else e # lookupRange elif modeError: raise KeyError(f"Can't find element {v} in any ranges") # lookupRange else: # lookupRange if modeFill: row = list(row); row[col] = fill if e is sentinel else e; yield row # lookupRange elif modeRmOrError and e is not sentinel: row = list(row); row[col] = e; yield row # lookupRange elif modeInput: row = list(row); row[col] = v if e is sentinel else e; yield row # lookupRange elif modeError: raise KeyError(f"Can't find element {v} in any ranges") # lookupRange else: # lookupRange for row in it: # lookupRange v = row if col is None else row[col] # lookupRange e = next((vv for x,y,vv in ranges if x <= v < y), sentinel) # lookupRange if colIsNone: # lookupRange if modeFill: yield fill if e is sentinel else e # lookupRange elif modeRmOrError and e is not sentinel: yield e # lookupRange elif modeInput: yield v if e is sentinel else e # lookupRange elif modeError: raise KeyError(f"Can't find element {v} in any ranges") # lookupRange else: # lookupRange if modeFill: row = list(row); row[col] = fill if e is sentinel else e; yield row # lookupRange elif modeRmOrError and e is not sentinel: row = list(row); row[col] = e; yield row # lookupRange elif modeInput: row = list(row); row[col] = v if e is sentinel else e; yield row # lookupRange elif modeError: raise KeyError(f"Can't find element {v} in any ranges") # lookupRange
[docs]class getitems(BaseCli): # getitems
[docs] def __init__(self, *fields, default=None): # getitems """Basically [input[x] for x in fields]. Example:: # returns [3, 1, ''] {"a": 1, "b": 2, "c": 3} | getitems("c", "a", "d") """ # getitems self.fields = fields; self.default = default # getitems
[docs] def __ror__(self, d): # getitems ans = []; default = self.default # getitems for f in self.fields: # getitems try: ans.append(d[f]) # getitems except: ans.append(default) # getitems return ans # getitems
[docs]class backup(BaseCli): # backup
[docs] def __init__(self): # backup """Backs up a file/folder. Example:: "some/folderOrFile" | backup() "some/folderOrFile" | backup.restore() Really straightforward. Uses bash internally to copy files recursively, so not available on Windows.""" # backup pass # backup
[docs] def __ror__(self, it): # backup it = os.path.expanduser(it) # backup None | cli.cmd(f"rm -rf '{it}.backup'") | cli.ignore() # backup None | cli.cmd(f"cp -r '{it}' '{it}.backup'") | cli.ignore() # backup
[docs] @staticmethod # backup def restore(): # backup def inner(it): # backup it = os.path.expanduser(it) # backup None | cli.cmd(f"rm -rf '{it}'") | cli.ignore() # backup None | cli.cmd(f"cp -r '{it}.backup' '{it}'") | cli.ignore() # backup return cli.aS(inner) # backup
sketch_interceptor = {} # backup
[docs]class sketch(BaseCli): # sketch _jsF_ctxIdx = None # sketch
[docs] def __init__(self, transforms:List[Callable]=[], titles:List[str]=None, im:bool=False, ncols:int=None, n:int=None, axes:int=None): # sketch """Convenience tool to plot multiple matplotlib plots at the same time, while still keeping everything short and in 1 line. For this example, we're trying to plot x^1, x^2, ..., x^8 on 2 separate plots, one left one right. The left will have x^1 till x^4, the right will have x^5 to x^8. How you would do this normally:: x = np.linspace(-2, 2); exps = range(1, 9) fig, axes = plt.subplots(1, 2, figsize=(10, 4)) # simplest solution plt.sca(axes[0]); plt.plot(x, x**1); plt.plot(x, x**2); plt.plot(x, x**3); plt.plot(x, x**4); plt.legend([1, 2, 3, 4]); plt.xlabel("x axis") # solution using a little bit of cli plt.sca(axes[1]); range(5, 9) | apply(lambda a: [x, x**a]) | ~apply(plt.plot) | ignore(); plt.legend([5, 6, 7, 8]); plt.xlabel("x axis") But this is long, and I'm incredibly lazy to write it all out. So here's how it's going to work using this cli:: # plotting the first 4 lines only, in a single plot. Should be familiar and make sense to you before moving on exps | apply(lambda a: [x, x**a]) | batched(4) | item() | ~apply(plt.plot) | ignore() # plotting 8 lines across 2 plots. Simplest example using sketch(). It kinda captures clis after it and use it to plot each plot exps | apply(lambda a: [x, x**a]) | batched(4) | (sketch() | ~apply(plt.plot)) # same as above, but adding a grid and x axis label to all plots. Transformation functions can be anything you would # put inside a normal cli (`plt` will be passed as argument): string code, op()-capture, lambda functions, other cli tools transforms = ["x.grid(True)", op().xlabel("x axis"), lambda x: x.ylabel("y axis")] exps | apply(lambda a: [x, x**a]) | batched(4) | (sketch(transforms) | ~apply(plt.plot)) # same as above, but adding legends. [x, x**a] will eventually be directed to ~apply(plt.plot), while f"x^{a}" will be directed to aS(plt.legend) exps | apply(lambda a: [[x, x**a], f"x^{a}"]) | batched(4) | (sketch() | transpose() | ~apply(plt.plot) + iden() | deref() | rItem(1) | aS(plt.legend)) | deref() Last line will generate this plot: .. image:: ../images/probScale.png Is it worth the extra confusion? Afterall, it just saves you 2-3 lines of code. To me, it is worth it, because you can quickly change styles (add a grid, make y axis log) See also: :class:`~k1lib.cli.output.plotImgs` Check out a gallery of more examples at `kapi/9-mpl <https://mlexps.com/kapi/9-mpl/>`_. :param transforms: transform functions to be run when drawing every plot. ``plt`` (aka ``matplotlib.pyplot``) will be passed in :param titles: if specified, use these titles for each plot. Kinda hacky I have to admit :param im: if True, returns a PIL image and closes the sketch, else return nothing but still have the sketch open :param ncols: if specified, will sketch with this number of columns :param n: if specified, use this number of sketch instead of figuring out automatically :param axes: if specified, forgo calculating #axes and initialization altogether and just use the provided axes""" # sketch super().__init__(capture=True); self.titles = titles; self.im = im # sketch self.transforms = [cli.fastF(t) for t in transforms]; self.ncols = ncols; self.n = n; self.axes = axes # sketch
[docs] def __ror__(self, it): # sketch it = list(it); n = self.n or len(it); s = self.capturedSerial; transforms = self.transforms # sketch ncols = self.ncols or math.ceil(n**0.5); nrows = math.ceil(n/ncols) # sketch if self.axes: axes = self.axes # sketch else: # sketch fig, axes = plt.subplots(nrows, ncols, figsize=(ncols*5, nrows*4)) # sketch if nrows*ncols == 1: axes = [axes] # sketch if axes | cli.shape() | cli.shape(0) > 1: axes = axes.flatten() # sketch for i, [ax, e, title] in enumerate(zip(axes, it, self.titles or ("" | cli.repeat()))): # sketch plt.sca(ax); e | s | cli.deref() # sketch if title: plt.title(title) # sketch for trans in transforms: trans(plt) # sketch if self.n is None: axes[i+1:] | cli.op().remove().all() | cli.deref(); plt.tight_layout() # sketch if self.im: return plt.gcf() | cli.toImg() # sketch if self.n: return axes[i+1:] # sketch
def _jsF(self, meta): # sketch if self.n: raise Exception("sketch()._jsF() doesn't support .n parameter yet") # sketch if self.axes: raise Exception("sketch()._jsF() doesn't support .axes parameter yet") # sketch fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); ctxIdx = init._jsDAuto() # sketch # generate all child functions here # sketch sketch._jsF_ctxIdx = ctxIdx # sketch header, _fIdx, _async = k1lib.kast.asyncGuard(self.capturedSerial._jsF(meta)) # sketch # then generate all transforms here, using a tracing compiler # sketch tfStmts = "" # sketch if len(self.transforms) > 0: # sketch class Interceptor: # sketch def __getattr__(self, attr): # sketch if getattr(plt, attr) not in sketch_interceptor: raise Exception(f"Transpiling function `plt.{attr}` is not supported at the moment") # sketch return lambda *args, **kwargs: sketch_interceptor[getattr(plt, attr)](*args, **kwargs) # sketch tfStmts = tfs = self.transforms | cli.apply(cli.init.fastF) | cli.op()(Interceptor()).all() | cli.join("; ") # sketch sketch._jsF_ctxIdx = None # sketch return f"""\ {ctxIdx} = null;\n{header} {fIdx} = async ({dataIdx}) => {{ // dataIdx should have const ctx = []; // this is the object that will be sent to the rendering server! const titles = {json.dumps(self.titles)} ?? Array({dataIdx}.length); for (const i of [...Array({dataIdx}.length).keys()]) {{ {ctxIdx} = []; // actually executing function and plotting function downstream {'await ' if _async else ''}{_fIdx}({dataIdx}[i]); if (titles[i]) {ctxIdx}.push(["title", titles[i]]); // inject all transforms here {tfStmts}; ctx.push({ctxIdx}); {ctxIdx} = null; }} // console.log(ctx); // console.log(JSON.stringify(ctx)); const res = await (await fetch("https://local.mlexps.com/routeServer/kapi_9-mpl", {{ method: "POST", body: JSON.stringify({{ "ctx": JSON.stringify(ctx) }}), headers: {{ "Content-Type": "application/json" }} }})).json() if (res.success) {{ const base64 = res.data; console.log("mpl fetched"); return `<img src="data:image/jpg;base64, ${{base64}}" />` }} else {{ throw new Error(res.reason); }} // return ctx; }}""", fIdx # sketch return f"{fIdx} = ({dataIdx}) => {dataIdx}.repeatFrom({cli.kjs.v(self.limit)})", fIdx # sketch
def _jsF_plt_ctxGuard(): # _jsF_plt_ctxGuard if sketch._jsF_ctxIdx is None: raise Exception("Have to wrap any plotting operations around sketch(). So, transform your code from `data | (toJsFunc() | ~aS(plt.plot))` into `[data] | (toJsFunc() | (sketch() | ~aS(plt.plot)))`") # _jsF_plt_ctxGuard return sketch._jsF_ctxIdx # _jsF_plt_ctxGuard try: import matplotlib.pyplot as plt; hasMpl = True # _jsF_plt_ctxGuard except: hasMpl = False # _jsF_plt_ctxGuard if hasMpl: # _jsF_plt_ctxGuard def _jsF_plt_plot(meta, c=None): # _jsF_plt_ctxGuard fIdx = init._jsFAuto(); xIdx = init._jsDAuto(); yIdx = init._jsDAuto(); ctxIdx = _jsF_plt_ctxGuard() # _jsF_plt_ctxGuard return f"""\ {fIdx} = ({xIdx}, {yIdx}=null) => {{ if (!{yIdx}) {{ // handle only xIdx is available case {yIdx} = {xIdx}; {xIdx} = [...Array({yIdx}.length).keys()]; }} {ctxIdx}.push(["plot", {xIdx}, {yIdx}]); }}""", fIdx # _jsF_plt_ctxGuard settings.kjs.jsF[plt.plot] = _jsF_plt_plot # _jsF_plt_ctxGuard def _jsF_plt_title(meta): # version that passes args in js side # _jsF_plt_ctxGuard fIdx = init._jsFAuto(); titleIdx = init._jsDAuto(); ctxIdx = _jsF_plt_ctxGuard() # _jsF_plt_ctxGuard return f"""{fIdx} = ({titleIdx}) => {{ {ctxIdx}.push(["title", {titleIdx}]); }}""", fIdx # _jsF_plt_ctxGuard settings.kjs.jsF[plt.title] = _jsF_plt_title # below is version that passes args in python side, returns statement, instead of (header, fIdx) like usual # _jsF_plt_ctxGuard sketch_interceptor[plt.title] = lambda title: f"""{_jsF_plt_ctxGuard()}.push(["title", `{title}`])""" # _jsF_plt_ctxGuard def _jsF_plt_grid(meta): # _jsF_plt_ctxGuard fIdx = init._jsFAuto(); tfIdx = init._jsDAuto(); ctxIdx = _jsF_plt_ctxGuard() # _jsF_plt_ctxGuard return f"""{fIdx} = ({tfIdx}) => {{ {ctxIdx}.push(["grid", {tfIdx}]); }}""", fIdx # _jsF_plt_ctxGuard settings.kjs.jsF[plt.grid] = _jsF_plt_grid; sketch_interceptor[plt.grid] = lambda tf=True: f"""{_jsF_plt_ctxGuard()}.push(["grid", {cli.kjs.v(tf)}])""" # _jsF_plt_ctxGuard def _jsF_plt_legend(meta, framealpha=1): # _jsF_plt_ctxGuard fIdx = init._jsFAuto(); legendIdx = init._jsDAuto(); ctxIdx = _jsF_plt_ctxGuard() # _jsF_plt_ctxGuard return f"""{fIdx} = ({legendIdx}) => {{ {ctxIdx}.push(["legend", {legendIdx}, {framealpha}]); }}""", fIdx # _jsF_plt_ctxGuard settings.kjs.jsF[plt.legend] = _jsF_plt_legend; sketch_interceptor[plt.legend] = lambda legend=None, framealpha=1: f"""{_jsF_plt_ctxGuard()}.push(["legend", {cli.kjs.v(legend)}, {cli.kjs.v(framealpha)}])""" # _jsF_plt_ctxGuard # _jsF_plt_ctxGuard sketch_interceptor[plt.xlim] = lambda left=None, right=None: f"""{_jsF_plt_ctxGuard()}.push(["xlim", {cli.kjs.v(left)}, {cli.kjs.v(right)}])""" # _jsF_plt_ctxGuard sketch_interceptor[plt.ylim] = lambda bottom=None, top=None: f"""{_jsF_plt_ctxGuard()}.push(["ylim", {cli.kjs.v(bottom)}, {cli.kjs.v(top)}])""" # _jsF_plt_ctxGuard sketch_interceptor[plt.xscale] = lambda scale: f"""{_jsF_plt_ctxGuard()}.push(["xscale", {cli.kjs.v(scale)}])""" # _jsF_plt_ctxGuard sketch_interceptor[plt.xlabel] = lambda label: f"""{_jsF_plt_ctxGuard()}.push(["xlabel", {cli.kjs.v(label)}])""" # _jsF_plt_ctxGuard sketch_interceptor[plt.ylabel] = lambda label: f"""{_jsF_plt_ctxGuard()}.push(["ylabel", {cli.kjs.v(label)}])""" # _jsF_plt_ctxGuard import numbers, sys; from collections import deque # _jsF_plt_ctxGuard
[docs]class syncStepper(BaseCli): # syncStepper
[docs] def __init__(self, col=0, sort=False): # syncStepper """Steps forward all streams at a time, yielding same results from min to max. That's a bit vague, so let's see an example:: a = [["a", 1], ["b", 7 ], ["c", 4], ["e", 6]] b = [["b", 5], ["c", 1 ], ["d", 3], ["f", 5]] c = [["a", 2], ["c", -4], ["d", 9], ["e", 4]] [a, b, c] | syncStepper() | deref() # sync-step by the 1st column [a, b, c] | syncStepper(1, True) | deref() # sync-step by the 2nd column. Have to sort it explicitly The first line returns this:: [[['a', 1], None, ['a', 2]], [['b', 7], ['b', 5], None], [['c', 4], ['c', 1], ['c', -4]], [None, ['d', 3], ['d', 9]], [['e', 6], None, ['e', 4]], [None, ['f', 5], None]] The second line returns this:: [[None, None, ['c', -4]], [['a', 1], ['c', 1], None], [None, None, ['a', 2]], [None, ['d', 3], None], [['c', 4], None, ['e', 4]], [None, ['b', 5], None], [['e', 6], None, None], [['b', 7], None, None], [None, None, ['d', 9]]] ``col`` can be None, but it's quite a strange use case:: [['a', 'b', 'c', 'e'], ['b', 'c', 'd', 'f'], ['a', 'c', 'd', 'e']] | syncStepper(None) | deref() It returns this:: [[['a'], None, ['a']], [['b'], ['b'], None], [['c'], ['c'], ['c']], [None, ['d'], ['d']], [['e'], None, ['e']], [None, ['f'], None]] As you can see, for each line, it kinda yields elements with the same column. If that element doesn't exist, it'll just put None there. This expects the input streams are sorted at the column of interest. If they are not, specify ``sort=True``. It has roughly the same vibe as :class:`~k1lib.cli.structural.groupBy`, in that it groups everything by a specific column. The main difference here is that you can sync-step them line-by-line, loading very little into memory, so you can run this on giant datasets and not have to worry about running out of memory. With k streams each having n elements, you should expect memory complexity to be O(k), and the time complexity to be O(n*k^2/2). That k^2 term is kinda worrying, but in most use cases, k is small and so k^2 can be treated as a constant See also: :class:`~k1lib.cli.structural.latch` :param col: column where it should compare values and merge them together. Can be None, but that would be quite a weird use case :param sort: whether to sort the streams or not. This cli requires it, but it's not turned on by default because it's an intensive operation""" # syncStepper if col is None: self.col = 0; self.colPreprocess = cli.wrapList().all() # syncStepper else: self.col = col; self.colPreprocess = cli.iden() # syncStepper self.bank = deque(); self.sentinel = object(); self._sort = sort # syncStepper
def _append(self, stIdx1, val1, elem1): # append to bank in the correct position # syncStepper i = 0; val2 = self.minObj # syncStepper for i, [stIdx2, val2, elem2] in enumerate(self.bank): # syncStepper if val1 <= val2: break # syncStepper if val1 <= val2: self.bank.insert(i, [stIdx1, val1, elem1]) # syncStepper else: self.bank.append([stIdx1, val1, elem1]) # syncStepper def _yieldNext(self): # yield the next set of values # syncStepper n = len(self.sts); res = [None]*n; last = None; hasInit = False; changed = False; bank = self.bank; sentinel = self.sentinel # syncStepper for i, [stIdx, val, elem] in enumerate(bank): # syncStepper if not hasInit and elem is sentinel: return res, changed # syncStepper if last == val or not hasInit: changed = True; res[stIdx] = elem # syncStepper elif hasInit: break # syncStepper hasInit = True; last = val # syncStepper while bank[0][1] == last: # popping the values off # syncStepper stIdx, val1, elem1 = bank.popleft(); val2, elem2 = next(self.sts[stIdx]) # syncStepper if val1 > val2: raise Exception(f"Stream {stIdx} has not been sorted yet! Please sort all streams before passing it into syncStepper") # syncStepper self._append(stIdx, val2, elem2) # syncStepper return res, changed # syncStepper
[docs] def __ror__(self, sts): # sts = "streams" # syncStepper col = self.col; it = init.dfGuard(it) # syncStepper # --------------------- All of this is just to figure out the type of the column dynamically. So painful --------------------- # syncStepper samples, sts = sts | self.colPreprocess.all() | cli.apply(cli.peek()) | cli.transpose() | cli.cut(col) + cli.iden() | cli.apply(list) # syncStepper if len([e for e in sts if e != []]) == 0: return # no elements to yield at all! # syncStepper n_nums = sum([1 if isinstance(e, numbers.Number) else 0 for e in samples]) # syncStepper n_strs = sum([1 if isinstance(e, str) else 0 for e in samples]); n = len(samples) # syncStepper if n_nums*(n-n_nums) + n_strs*(n-n_strs) > 0: raise Exception("The requested column in some of the streams is not purely of numeric or string type, a requirement of syncStepper(). Please fix your data structure and try again.") # syncStepper if n_nums + n_strs == 0: raise Exception("The requested column in some of the streams is not of numeric or string type, so can't compare them to sync-step them") # syncStepper # n = 3; n_strs = 1 # syncStepper text = n_strs > 0; self.minObj = "" if text else float("-inf"); self.maxObj = chr(sys.maxunicode) if text else float("inf"); senObj = [self.maxObj, self.sentinel] # syncStepper # --------------------- And here's the meat of the cli --------------------- # syncStepper sts = sts | (cli.sort(col, not text).all() if self._sort else cli.iden()) | cli.apply(lambda st: [st | cli.apply(lambda elem: [elem[col], elem]), senObj | cli.repeat()] | cli.joinStreams()) | cli.aS(list) # syncStepper sts | cli.apply(next) | cli.insertIdColumn() | ~cli.apply(lambda idx,e: self._append(idx, *e)) | cli.ignore(); self.sts = sts # syncStepper while True: # syncStepper res, changed = self._yieldNext() # syncStepper if not changed: break # syncStepper yield res # syncStepper
[docs]class zeroes(BaseCli): # zeroes
[docs] def __init__(self, col:int=None, log=False, offset:float=0): # zeroes """Shift the specified column so that the first element is zero Example:: range(13, 20) | zeroes() | deref() # returns [0, 1, 2, 3, 4, 5, 6] range(13, 20) | zeroes(offset=5) | deref() # returns [5, 6, 7, 8, 9, 10, 11] [2, 3, 1, 4, 7] | zeroes() | deref() # returns [0, 1, -1, 2, 5] Assumes the first element is going to be transformed to zero, thus the last example. This cli also has log mode, where the natural log of the values will be shifted to zero:: # returns [1.0, 1.5, 0.5, 2.0, 3.5] [2, 3, 1, 4, 7] | zeroes(log=True) | aS(round, 2).all() | deref() # returns [2.72, 4.08, 1.36, 5.44, 9.51] [2, 3, 1, 4, 7] | zeroes(offset=1, log=True) | aS(round, 2).all() | deref() This is essentially the same as dividing everything by 2, so that the first element turns into 1. Super neat. The 2nd example is equivalent to multiplying everything by e/2. This cli can function in a table (.col != None):: # returns [[0, 'a'], [1, 'b'], [2, 'c'], [3, 'd'], [4, 'e'], [5, 'f'], [6, 'g']] [[13, 'a'], [14, 'b'], [15, 'c'], [16, 'd'], [17, 'e'], [18, 'f'], [19, 'g']] | zeroes(0) | deref() This cli can also act across multiple list of numbers:: data = [[2, 3, 1, 4, 7], [1, 4, 3, 6, 9]] data2 = [[[2, 'b'], [3, 'c'], [1, 'a'], [4, 'd'], [7, 'g']], [[1, 'a'], [4, 'd'], [3, 'c'], [6, 'f'], [9, 'i']]] # returns [[0, 1, -1, 2, 5], [5, 8, 7, 10, 13]] data | ~zeroes() | deref() # returns [[1, 2, 0, 3, 6], [6, 9, 8, 11, 14]] data | ~zeroes(offset=1) | deref() # returns [[1.0, 1.5, 0.5, 2.0, 3.5], [3.5, 14.0, 10.5, 21.0, 31.5]] data | ~zeroes(log=True) | aS(round, 2).all(2) | deref() # returns [[[0, 'b'], [1, 'c'], [-1, 'a'], [2, 'd'], [5, 'g']], [[5, 'a'], [8, 'd'], [7, 'c'], [10, 'f'], [13, 'i']]] data2 | ~zeroes(0) | deref() So as you can see, the offsets are adjusted so that the first element of each list starts from the last element of the previous list :param col: column to shift values :param offset: custom offset of the minimum value, defaulted to zero :param log: whether to zero it linearly or zero it logarithmically""" # zeroes self.col = col; self.log = log; self.offset = offset; self.inverted = False # zeroes
[docs] def __invert__(self): res = zeroes(self.col, self.log, self.offset); res.inverted = True; return res # zeroes
[docs] def __ror__(self, it): # zeroes col = self.col; log = self.log; offset = self.offset; it = init.dfGuard(it) # zeroes if self.inverted: # zeroes def gen(): # zeroes currentOffset = offset # zeroes for arr in it: # zeroes arr = arr | zeroes(col, log, currentOffset) # zeroes if isinstance(arr, settings.arrayTypes): # zeroes bm = np if isinstance(arr, np.ndarray) else (torch if hasTorch and isinstance(arr, torch.Tensor) else None) # zeroes if bm: # zeroes if col is None: currentOffset = bm.log(arr[-1]) if log else arr[-1] # zeroes else: currentOffset = bm.log(arr[-1][col]) if log else arr[-1][col] # zeroes yield arr; continue # zeroes # yes, we have to deref() them, even though perf will suffer, because let's say # zeroes # that the user then does rItem(3), and discards elements 0, 1 and 2. Then 0, 1, 2 # zeroes # won't be run, so element 3 won't know its offset! # zeroes if col is None: arr = list(arr); currentOffset = math.log(arr[-1]) if log else arr[-1] # zeroes else: arr = [list(row) for row in arr]; currentOffset = math.log(arr[-1][col]) if log else arr[-1][col] # zeroes yield arr # zeroes return gen() # zeroes if isinstance(it, settings.arrayTypes): # zeroes bm = np if isinstance(it, np.ndarray) else (torch if hasTorch and isinstance(it, torch.Tensor) else None) # zeroes if bm: # zeroes cloneF = np.copy if isinstance(it, np.ndarray) else torch.clone # zeroes if log: # zeroes if col is None: minValue = bm.log(it[0]) - offset; return bm.exp(bm.log(it) - minValue) # zeroes else: minValue = bm.log(it[0, col]) - offset; it = cloneF(it); it[:,col] = bm.exp(bm.log(it[:,col]) - minValue); return it # zeroes else: # zeroes if col is None: minValue = it[0] - offset; return it - minValue # zeroes else: minValue = it[0, col] - offset; it = cloneF(it); it[:,col] = it[:,col] - minValue; return it # zeroes row, it = it | cli.peek() # zeroes if it == []: return [] # zeroes if log: # zeroes mlog = math.log; mexp = math.exp # zeroes if col is None: minValue = mlog(row) - offset; return (mexp(mlog(row) - minValue) for row in it) # zeroes else: minValue = mlog(row[col]) - offset; return ([*row[:col], mexp(mlog(row[col]) - minValue), *row[col+1:]] for row in it) # zeroes else: # zeroes if col is None: minValue = row - offset; return (row - minValue for row in it) # zeroes else: minValue = row[col] - offset; return ([*row[:col], row[col] - minValue, *row[col+1:]] for row in it) # zeroes
[docs]class normalize(BaseCli): # normalize
[docs] def __init__(self, col:int=None, mode:int=0): # normalize """Normalize the data going in. Example:: arr = np.random.randn(100)+10 arr | normalize() # returns array with mean around 0 arr | normalize(mode=1) # returns array with mean around 0.5, min 0, max 1 arr = np.random.randn(100, 20)+10 arr | normalize(2) # returns array with 2nd (0-indexing!) column have mean around 0. Other columns not touched arr | normalize(2, mode=1) # returns array with 2nd (0-indexing!) column have mean around 0.5 :param col: column to apply the normalization to :param mode: 0 for ``(x - x.mean())/s.std()``, 1 for ``(x - x.min())/(x.max() - x.min())``""" # normalize self.col = col; self.mode = mode # normalize
def _all_array_opt(self, it, level): # normalize col = self.col; n = len(it.shape); s = slice(None, None, None) # normalize if col is None: # normalize # (*level, N, *rest (>0)) -> (*level, N, rest) -> (*level, N*rest) -> (*level) (this is mean & std) -> (*level, N, *rest) # normalize if level+1 == len(it.shape): it = it[(*[s]*len(it.shape), None)]; n += 1 # normalize b = it | cli.joinSt(n-level-2).all(level+1); c = b | cli.joinSt().all(level) # normalize if self.mode == 0: # normalize mean = c.mean(level)[(*[s]*level,None,None)] # normalize std = c.std(level)[(*[s]*level,None,None)] # normalize return ((b - mean)/std).reshape(it.shape) # normalize else: # normalize min_ = c.min(level)[(*[s]*level,None,None)] # normalize max_ = c.max(level)[(*[s]*level,None,None)] # normalize return ((b - min_)/(max_ - min_)).reshape(it.shape) # normalize else: # normalize # (*level, N, F, *rest (>0)) -> (*level, N, *rest) -> (*level, N, rest) -> (*level, N*rest) -> (*level) (this is mean & std) -> (*level, N, F, *rest) # normalize a = np.copy(it) if isinstance(it, np.ndarray) else torch.clone(it); unsqueezed = False; s = slice(None, None, None) # normalize if level+2 == len(a.shape): a = a[(*[s]*len(a.shape), None)]; unsqueezed = True; n += 1 # normalize b = a[(*[slice(None,None,None)]*(level+1),col)] | cli.joinSt(n-level-3).all(level+1) # (*level, N, rest (>0, hence unsqueeze)) # normalize c = b | cli.joinSt(len(b.shape)-level-1).all(level) # (*level, N*rest) # normalize if self.mode == 0: # normalize mean = c.mean(level)[(*[s]*level,None,None)] # normalize std = c.std(level)[(*[s]*level,None,None)] # normalize b[:] = (b - mean)/std # normalize else: # normalize min_ = c.min(level)[(*[s]*level,None,None)] # normalize max_ = c.max(level)[(*[s]*level,None,None)] # normalize b[:] = (b - min_)/(max_ - min_) # normalize return (a | cli.joinSt().all(len(a.shape)-2)) if unsqueezed else a # normalize return NotImplemented # normalize
[docs] def __ror__(self, x): # normalize col = self.col; mode = self.mode; x = init.dfGuard(x) # normalize if isinstance(x, k1lib.settings.cli.arrayTypes): # normalize dims = len(x.shape) # normalize if col is None: return (x - x.mean())/x.std() if mode == 0 else (x - x.min())/(x.max() - x.min()) # normalize else: # normalize if mode == 0: xc = x[:,col]; x[:,col] = (xc - xc.mean())/xc.std(); return x # normalize else: xc = x[:,col]; x[:,col] = (xc - xc.min())/(xc.max() - xc.min()); return x # normalize if col is None: return np.array(list(x)) | self # normalize else: # normalize it = x; ans = []; it = it | cli.deref(2) # normalize if len(it) == 0: return [] # normalize if mode == 0: # normalize mean = [row[col] for row in it] | cli.toMean() # normalize std = [row[col] for row in it] | cli.toStd() # normalize for row in it: row[col] = (row[col]-mean)/std; ans.append(row) # normalize else: # normalize _min = min([row[col] for row in it]) # normalize _max = max([row[col] for row in it]) # normalize for row in it: row[col] = (row[col]-_min)/(_max-_min); ans.append(row) # normalize return ans # normalize
[docs]class branch(BaseCli): # branch
[docs] def __init__(self, f, f1, f2): # branch """Works like an if statement, for when you don't want to make a separate function as it's too time consuming. 3 | branch(lambda x: x>2, lambda x: x+4, lambda x: x+5) # returns 7 3 | branch(op()>2, op()+4, op()+5) # returns 7 3 | branch("x>2", "x+4", "x+5") # returns 7 3 | aS(lambda x: (x + 4) if (x > 2) else (x + 5)) # returns 7 So all of them kinda does the same thing as the 4th line. Is it worth it? Debatable, but I've had so many times that I have to wrap things in parenthesis around expressions to make sure it's not doing anything weird and that takes long enough to disrupt my thought process that I kinda have to make this :param f: predicate function. If returns True, use the first function (f1), else use the second function (f2)""" # branch self.f = f; self._fC = cli.fastF(f) # branch self.f1 = f1; self._fC1 = cli.fastF(f1) # branch self.f2 = f2; self._fC2 = cli.fastF(f2) # branch
[docs] def __ror__(self, it): return self._fC1(it) if self._fC(it) else self._fC2(it) # branch