# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for all short and random quality-of-life utilities."""
from k1lib.cli.init import patchDefaultDelim, BaseCli, yieldT
import k1lib.cli as cli, k1lib.cli.init as init, numbers, numpy as np, dis
from k1lib.cli.typehint import *
from typing import overload, Iterator, Any, List, Set, Union, Callable
import k1lib, time, math, os, json, dill
from collections import defaultdict
try: import torch; hasTorch = True
except: torch = k1lib.Object().withAutoDeclare(lambda: type("RandomClass", (object, ), {})); hasTorch = False
try: import PIL; hasPIL = True
except: hasPIL = False
plt = k1lib.dep.plt
try: import genpy, rosbag; hasRos1 = True
except: hasRos1 = False
try: import pandas as pd; pd.core; hasPandas = True
except: hasPandas = False
__all__ = ["size", "shape", "item", "rItem", "iden", "join", "wrapList",
"equals", "reverse", "ignore", "rateLimit", "timeLimit", "tab", "indent",
"clipboard", "deref", "bindec", "smooth", "disassemble",
"tree", "lookup", "lookupRange", "getitems", "backup", "sketch", "syncStepper", "zeroes", "normalize", "branch"]
settings = k1lib.settings.cli
def exploreSize(it): # exploreSize
"""Returns first element and length of array. Returns [first item, length]""" # exploreSize
if isinstance(it, str): return None, len(it) # exploreSize
try: return it[0], len(it) # exploreSize
except: pass # exploreSize
sentinel = object(); it = iter(it) # exploreSize
o = next(it, sentinel); count = 1 # exploreSize
if o is sentinel: return None, 0 # exploreSize
try: # exploreSize
while True: next(it); count += 1 # exploreSize
except StopIteration: pass # exploreSize
return o, count # exploreSize
[docs]class size(BaseCli): # size
[docs] def __init__(self, idx=None): # size
"""Returns number of rows and columns in the input.
Example::
# returns (3, 2)
[[2, 3], [4, 5, 6], [3]] | shape()
# returns 3
[[2, 3], [4, 5, 6], [3]] | shape(0)
# returns 2
[[2, 3], [4, 5, 6], [3]] | shape(1)
# returns (2, 0)
[[], [2, 3]] | shape()
# returns (3,)
[2, 3, 5] | shape()
# returns 3
[2, 3, 5] | shape(0)
# returns (3, 2, 2)
[[[2, 1], [0, 6, 7]], 3, 5] | shape()
# returns (1, 3)
["abc"] | shape()
# returns (1, 2, 3)
[torch.randn(2, 3)] | shape()
# returns (2, 3, 5)
shape()(np.random.randn(2, 3, 5))
:class:`shape` is an alias of this cli. Use whichever is more intuitive for you.
:param idx: if not specified, returns a tuple of ints. If specified,
then returns the specific index of the tuple""" # size
super().__init__(); self.idx = idx; # size
if idx is not None: self._f = cli.item(idx) # size
def _all_array_opt(self, it, level): # size
res = np.array(it.shape[level:])[tuple([None]*level)] + np.zeros(it.shape[:level], dtype=int)[(*[slice(None)]*level, None)] # size
return res if self.idx is None else res | cli.rItem(self.idx).all(level) # size
def _typehint(self, inp): # size
if self.idx is not None: return int # size
return tList(int) # size
[docs] def __ror__(self, it:Iterator[str]): # size
idx = self.idx # size
if idx == 0: # super quick path for the really common case # size
try: return len(it) # size
except: # size
try: return exploreSize(it)[1] # size
except: pass # size
if hasPIL and isinstance(it, PIL.Image.Image): return it.size if idx is None else it.size[idx] # size
if hasPandas and isinstance(it, pd.core.frame.DataFrame): s = (len(it), it.size//len(it)); return s if idx is None else s[idx] # size
if idx is None: # size
answer = [] # size
try: # size
while True: # size
if isinstance(it, settings.arrayTypes): # size
return tuple(answer + list(it.shape)) # size
it, s = exploreSize(it); answer.append(s) # size
except TypeError: pass # size
return tuple(answer) # size
return exploreSize(it | self._f)[1] # size
def _jsF(self, meta): # size
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # size
post = "" if self.idx is None else f"[{cli.kjs.v(self.idx)}]" # size
return f"{fIdx} = ({dataIdx}) => {dataIdx}.shape(){post}", fIdx # size
shape = size # size
noFill = object() # size
[docs]class item(BaseCli): # item
[docs] def __init__(self, amt:int=1, fill=noFill): # item
"""Returns the first element of the input iterator.
Example::
# returns 0
range(5) | item()
# returns torch.Size([5])
torch.randn(3,4,5) | item(2) | shape()
# returns 3
[] | item(fill=3)
:param amt: how many times do you want to call item() back to back?
:param fill: if iterator length is 0, return this""" # item
self.amt = amt; self.fill = fill # item
self.fillP = [fill] if fill != noFill else [] # preprocessed, to be faster # item
if self.amt != 1: self._f = cli.serial(*(item(fill=self.fill) for _ in range(self.amt))) # item
def _all_array_opt(self, it, level): return it[(*[slice(None, None, None) for i in range(level)], 0)] # item
def _typehint(self, inp): # item
if isinstance(inp, tListIterSet): return inp.child # item
if isinstance(inp, tCollection): return inp.children[0] # item
if isinstance(inp, tArrayTypes): # item
if inp.rank is None: return inp.__class__(inp.child, None) # item
if inp.rank - self.amt >= 1: return inp.__class__(inp.child, inp.rank-self.amt) # item
return inp.child # item
return tAny() # item
[docs] def __ror__(self, it:Iterator[str]): # item
if self.amt != 1: return it | self._f # item
if isinstance(it, settings.arrayTypes): return it[0] # item
if hasPandas and isinstance(it, pd.DataFrame): return it[:1].to_numpy()[0] # item
return next(iter(init.dfGuard(it)), *self.fillP) # item
def _jsF(self, meta): # item
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); _slice = "".join(["[0]"]*self.amt) # item
return f"{fIdx} = ({dataIdx}) => {dataIdx}{_slice}", fIdx # item
[docs]class rItem(BaseCli): # rItem
[docs] def __init__(self, idx:int): # rItem
"""Combines ``rows(idx) | item()``, as this is a pretty common pattern.
Example::
iter(range(10)) | rItem(4) # returns 4
""" # rItem
self.idx = idx; self.arrayTypes = (*settings.arrayTypes, list, tuple) # rItem
def _all_array_opt(self, it, level:int): return it[(*[slice(None, None, None) for i in range(level)], self.idx)] # rItem
[docs] def __ror__(self, it): # rItem
idx = self.idx # rItem
if isinstance(it, self.arrayTypes): return it[idx] # rItem
if hasPandas and isinstance(it, pd.DataFrame): return it[idx:idx+1].to_numpy()[0] # rItem
for i, e in zip(range(self.idx+1), it): pass # rItem
return e # rItem
def _jsF(self, meta): # rItem
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # rItem
return f"{fIdx} = ({dataIdx}) => {dataIdx}[{cli.kjs.v(self.idx)}]", fIdx # rItem
[docs]class iden(BaseCli): # iden
[docs] def __init__(self): # iden
"""Yields whatever the input is. Useful for multiple streams.
Example::
# returns range(5)
range(5) | iden()""" # iden
super().__init__() # iden
def _all_array_opt(self, it, level): return it # iden
def _typehint(self, inp): return inp # iden
[docs] def __ror__(self, it:Iterator[Any]): return it # iden
def _jsF(self, meta): # iden
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # iden
return f"{fIdx} = ({dataIdx}) => {dataIdx}", fIdx # iden
[docs]class join(BaseCli): # join
[docs] def __init__(self, delim:str=None): # join
r"""Merges all strings into 1, with `delim` in the middle. Basically
:meth:`str.join`. Example::
# returns '2\na'
[2, "a"] | join("\n")""" # join
super().__init__(); self.delim = patchDefaultDelim(delim) # join
def _typehint(self, inp): return str # join
[docs] def __ror__(self, it:Iterator[str]): # join
return self.delim.join(init.dfGuard(it) | cli.apply(str)) # join
def _jsF(self, meta): # join
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # join
return f"{fIdx} = ({dataIdx}) => {dataIdx}.join({json.dumps(self.delim)})", fIdx # join
[docs]class wrapList(BaseCli): # wrapList
[docs] def __init__(self): # wrapList
"""Wraps inputs inside a list. There's a more advanced cli tool
built from this, which is :meth:`~k1lib.cli.structural.unsqueeze`. Example::
# returns [5]
5 | wrapList()""" # wrapList
super().__init__() # wrapList
def _all_array_opt(self, it, level): return it[(*[slice(None)]*level, None)] # wrapList
def _typehint(self, inp): return tList(inp) # wrapList
[docs] def __ror__(self, it) -> List[Any]: # wrapList
if isinstance(it, settings.arrayTypes): return it[None] # wrapList
return [it] # wrapList
def _jsF(self, meta): # wrapList
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # wrapList
return f"{fIdx} = ({dataIdx}) => [{dataIdx}]", fIdx # wrapList
class _EarlyExp(Exception): pass # _EarlyExp
[docs]class equals: # equals
[docs] def __init__(self): # equals
"""Checks if all incoming columns/streams are identical""" # equals
super().__init__() # equals
[docs] def __ror__(self, streams:Iterator[Iterator[str]]): # equals
streams = list(streams) # equals
for row in zip(*streams): # equals
sampleElem = row[0] # equals
try: # equals
for elem in row: # equals
if sampleElem != elem: yield False; raise _EarlyExp() # equals
yield True # equals
except _EarlyExp: pass # equals
[docs]class reverse(BaseCli): # reverse
[docs] def __init__(self): # reverse
"""Reverses incoming list.
Example::
# returns [3, 5, 2]
[2, 5, 3] | reverse() | deref()""" # reverse
super().__init__() # reverse
def _all_array_opt(self, it, level): return it[(*[slice(None)]*level, slice(None, None, -1))] # reverse
def _typehint(self, inp): # reverse
if isinstance(inp, tListIterSet): return tIter(inp.child) # reverse
return tAny() # reverse
[docs] def __ror__(self, it:Iterator[str]) -> List[str]: # reverse
if isinstance(it, settings.arrayTypes): return it[::-1] # reverse
if hasPandas and isinstance(it, pd.core.arraylike.OpsMixin): return it[::-1] # reverse
return reversed(list(it)) # reverse
def _jsF(self, meta): # reverse
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # reverse
return f"{fIdx} = ({dataIdx}) => [...{dataIdx}].reverse()", fIdx # reverse
[docs]class ignore(BaseCli): # ignore
[docs] def __init__(self): # ignore
r"""Just loops through everything, ignoring the output.
Example::
# will just return an iterator, and not print anything
[2, 3] | apply(lambda x: print(x))
# will prints "2\n3"
[2, 3] | apply(lambda x: print(x)) | ignore()""" # ignore
super().__init__() # ignore
def _all_array_opt(self, it, level): return it # ignore
def _typehint(self, inp): return type(None) # ignore
[docs] def __ror__(self, it:Iterator[Any]): # ignore
if isinstance(it, settings.arrayTypes): return # ignore
if hasPandas and isinstance(it, pd.core.arraylike.OpsMixin): return # ignore
for _ in it: pass # ignore
def _jsF(self, meta): # ignore
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # ignore
return f"{fIdx} = ({dataIdx}) => {dataIdx}", fIdx # ignore
[docs]class rateLimit(BaseCli): # rateLimit
[docs] def __init__(self, f, delay=0.1): # rateLimit
"""Limits the execution flow rate upon a condition.
Example::
s = 0; semaphore = 0
def heavyAsyncOperation(i):
global semaphore, s
semaphore += 1
s += i; time.sleep(1)
semaphore -= 1; return i**2
# returns (20,), takes 1s to run
range(20) | applyTh(heavyAsyncOperation, 100) | shape()
# returns (20,), takes 4s to run (20/5 = 4)
range(20) | rateLimit(lambda: semaphore < 5) | applyTh(heavyAsyncOperation, 100) | shape()
The first test case is not rate-limited, so it will run all 20 threads at the
same time, and all of them will finish after 1 second.
The second test case is rate-limited, so that there can only be 5 concurrently
executing threads because of the semaphore count check. Therefore this takes
around 4 seconds to run.
:param f: checking function. Should return true if execution is allowed
:param delay: delay in seconds between calling ``f()``""" # rateLimit
self.f = f; self.delay = delay # rateLimit
def _typehint(self, inp): # rateLimit
if isinstance(inp, tListIterSet): return tIter(inp.child) # rateLimit
if isinstance(inp, tArrayTypes): # rateLimit
if inp.rank is None: return tIter(inp) # rateLimit
if inp.rank >= 2: return tIter(inp.__class__(inp.child, inp.rank - 1)) # rateLimit
return tIter(inp.child) # rateLimit
if isinstance(inp, tCollection): return inp # rateLimit
return tAny() # rateLimit
[docs] def __ror__(self, it): # rateLimit
f = self.f; delay = self.delay # rateLimit
for e in init.dfGuard(it): # rateLimit
while not f(): time.sleep(delay) # rateLimit
yield e # rateLimit
[docs] @staticmethod # rateLimit
def cpu(maxUtilization=90): # rateLimit
"""Limits flow rate when cpu utilization is more than a specified
percentage amount. Needs to install the package ``psutil`` to actually work.
Example::
# returns [0, 1, 4, 9, 16]
range(5) | rateLimit.cpu() | apply(op()**2) | deref()""" # rateLimit
import psutil # rateLimit
return rateLimit(lambda: psutil.cpu_percent() < maxUtilization) # rateLimit
[docs]class timeLimit(BaseCli): # timeLimit
[docs] def __init__(self, t): # timeLimit
"""Caps the flow after a specified amount of time has
passed. Example::
# returns 20, or roughly close to that
repeatF(lambda: time.sleep(0.1)) | timeLimit(2) | shape(0)""" # timeLimit
self.t = t # timeLimit
def _typehint(self, inp): # timeLimit
if isinstance(inp, tListIterSet): return tIter(inp.child) # timeLimit
if isinstance(inp, tArrayTypes): # timeLimit
if inp.rank is None: return tIter(inp) # timeLimit
if inp.rank >= 2: return tIter(inp.__class__(inp.child, inp.rank - 1)) # timeLimit
return tIter(inp.child) # timeLimit
if isinstance(inp, tCollection): return inp # timeLimit
return tAny() # timeLimit
[docs] def __ror__(self, it): # timeLimit
_time = time.time; endTime = _time() + self.t # timeLimit
for e in init.dfGuard(it): # timeLimit
yield e # timeLimit
if _time() > endTime: break # timeLimit
[docs]def tab(pad:str=" "*4): # tab
"""Indents incoming string iterator.
Example::
# prints out indented 0 to 9
range(10) | tab() | headOut()""" # tab
return cli.apply(lambda x: f"{pad}{x}") # tab
indent = tab # tab
[docs]class clipboard(BaseCli): # clipboard
[docs] def __init__(self): # clipboard
"""Saves the input to clipboard.
Example::
# copies "abc" into the clipboard. Just use Ctrl+V to paste as usual
"abc" | clipboard()""" # clipboard
import pyperclip; self.pyperclip = pyperclip # clipboard
def _typehint(self, inp): return type(None) # clipboard
[docs] def __ror__(self, s): self.pyperclip.copy(s) # clipboard
a = [numbers.Number, np.number, str, bool, bytes, k1lib.UValue, cli.conv.Audio] # clipboard
if hasTorch: a.append(torch.nn.Module) # clipboard
if hasRos1: a.append(rosbag.bag.BagMessage) # clipboard
if hasPandas: a.append(pd.core.arraylike.OpsMixin) # clipboard
settings.atomic.add("deref", tuple(a), "used by deref") # clipboard
Tensor = torch.Tensor; atomic = settings.atomic # clipboard
class inv_dereference(BaseCli): # inv_dereference
def __init__(self, igT=False): # inv_dereference
"""Kinda the inverse to :class:`dereference`""" # inv_dereference
super().__init__(); self.igT = igT # inv_dereference
def __ror__(self, it:Iterator[Any]) -> List[Any]: # inv_dereference
for e in it: # inv_dereference
if e is None or isinstance(e, atomic.deref): yield e # inv_dereference
elif isinstance(e, settings.arrayTypes): # inv_dereference
if not self.igT and len(e.shape) == 0: yield e.item() # inv_dereference
else: yield e # inv_dereference
else: # inv_dereference
try: yield e | self # inv_dereference
except: yield e # inv_dereference
_rosmsg_tempfile = [None]; _rosmsg_autoInc = k1lib.AutoIncrement() # inv_dereference
def rosmsg2BagMessage(msg): # kinda abandoned. Turns out you can't pickle a BagMessage cleanly afterall. I kinda have to do it the long way. If you want to be able to serialize a message, just do `obj | deref()`, it will wrap around using RosMsg(), which is serializable # rosmsg2BagMessage
if _rosmsg_tempfile[0] is None: _rosmsg_tempfile[0] = b"" | cli.file() # rosmsg2BagMessage
fn = f"{_rosmsg_tempfile[0]}_{os.getpid()}_{_rosmsg_autoInc()}" # rosmsg2BagMessage
with rosbag.Bag(fn, "w") as bag: bag.write("/default", msg) # rosmsg2BagMessage
res = rosbag.Bag(fn, "r").read_messages() | cli.item() # rosmsg2BagMessage
os.remove(fn); return res # rosmsg2BagMessage
_rosmsg_tempfile2 = [None]; _rosmsg_autoInc2 = k1lib.AutoIncrement() # rosmsg2BagMessage
def _rosmsg_getFn2(): # _rosmsg_getFn2
if _rosmsg_tempfile2[0] is None: _rosmsg_tempfile2[0] = b"" | cli.file(); os.remove(_rosmsg_tempfile2[0]) # _rosmsg_getFn2
return f"{_rosmsg_tempfile2[0]}_{os.getpid()}_{_rosmsg_autoInc2()}" # _rosmsg_getFn2
class RosMsg: # RosMsg
def __init__(self, msg): self._ab_sentinel = True; self.__msg = msg; self._ab_sentinel = False # RosMsg
def __getattr__(self, attr): # RosMsg
if attr == "__msg": return self.__msg # RosMsg
return getattr(self.__msg, attr) # RosMsg
def __getstate__(self): # RosMsg
fn = _rosmsg_getFn2() # RosMsg
with rosbag.Bag(fn, "w") as bag: bag.write("/default", self.__msg) # RosMsg
with open(fn, "rb") as f: raw = f.read() # RosMsg
os.remove(fn); return {"raw": raw} # RosMsg
def __setstate__(self, d): # RosMsg
fn = _rosmsg_getFn2() # RosMsg
with open(fn, "wb") as f: f.write(d["raw"]) # RosMsg
with rosbag.Bag(fn) as bag: self.__msg = next(bag.read_messages()).message # RosMsg
os.remove(fn) # RosMsg
def __repr__(self): return self.__msg.__repr__() # RosMsg
_rosMsgArrayTypes = k1lib.settings.cli.arrayTypes # RosMsg
class RosMsgPlaceholder: # RosMsgPlaceholder
def __init__(self, idx): self.idx = idx # RosMsgPlaceholder
def _rosmsg_complex_deref_replace(it, autoInc, msgs): # _rosmsg_complex_deref_replace
if isinstance(it, np.number): return it.item() # _rosmsg_complex_deref_replace
elif isinstance(it, k1lib.settings.cli.atomic.deref): return it # _rosmsg_complex_deref_replace
elif isinstance(it, _rosMsgArrayTypes): return it # _rosmsg_complex_deref_replace
elif isinstance(it, dict): _d = {k: _rosmsg_complex_deref_replace(v, autoInc, msgs) for k, v in it.items()}; return _d # _rosmsg_complex_deref_replace
elif isinstance(it, tuple): _t = tuple(_rosmsg_complex_deref_replace(k, autoInc, msgs) for k in it); return _t # _rosmsg_complex_deref_replace
elif isinstance(it, set): _s = set (_rosmsg_complex_deref_replace(k, autoInc, msgs) for k in it); return _s # _rosmsg_complex_deref_replace
elif isinstance(it, genpy.message.Message): idx = autoInc(); msgs[idx] = it; return RosMsgPlaceholder(idx) # _rosmsg_complex_deref_replace
elif isinstance(it, RosMsg): idx = autoInc(); msgs[idx] = it.__msg; return RosMsgPlaceholder(idx) # _rosmsg_complex_deref_replace
try: iter(it) # _rosmsg_complex_deref_replace
except: return it # _rosmsg_complex_deref_replace
answer = [] # _rosmsg_complex_deref_replace
for e in it: # _rosmsg_complex_deref_replace
if e is cli.yieldT: return answer # _rosmsg_complex_deref_replace
answer.append(_rosmsg_complex_deref_replace(e, autoInc, msgs)) # _rosmsg_complex_deref_replace
return answer # _rosmsg_complex_deref_replace
def _rosmsg_complex_deref_reconstruct(it, msgs): # _rosmsg_complex_deref_reconstruct
if isinstance(it, np.number): return it.item() # _rosmsg_complex_deref_reconstruct
elif isinstance(it, k1lib.settings.cli.atomic.deref): return it # _rosmsg_complex_deref_reconstruct
elif isinstance(it, _rosMsgArrayTypes): return it # _rosmsg_complex_deref_reconstruct
elif isinstance(it, dict): _d = {k: _rosmsg_complex_deref_reconstruct(v, msgs) for k, v in it.items()}; return _d # _rosmsg_complex_deref_reconstruct
elif isinstance(it, tuple): _t = tuple(_rosmsg_complex_deref_reconstruct(k, msgs) for k in it); return _t # _rosmsg_complex_deref_reconstruct
elif isinstance(it, set): _s = set (_rosmsg_complex_deref_reconstruct(k, msgs) for k in it); return _s # _rosmsg_complex_deref_reconstruct
elif isinstance(it, RosMsgPlaceholder): return msgs[it.idx] # _rosmsg_complex_deref_reconstruct
try: iter(it) # _rosmsg_complex_deref_reconstruct
except: return it # _rosmsg_complex_deref_reconstruct
answer = [] # _rosmsg_complex_deref_reconstruct
for e in it: # _rosmsg_complex_deref_reconstruct
if e is cli.yieldT: return answer # _rosmsg_complex_deref_reconstruct
answer.append(_rosmsg_complex_deref_reconstruct(e, msgs)) # _rosmsg_complex_deref_reconstruct
return answer # _rosmsg_complex_deref_reconstruct
class RosMsgComplex: # RosMsgComplex
def __init__(self, data): # RosMsgComplex
"""An attempt to speed up serialization of ROS messages.
Normally, I'd do this::
[msg1, msg2, ...] | deref() | aS(dill.dumps) | file("...")
But this is a little inefficient as the process of writing to and reading from a temp bag file
is not that fast. So this kinda bunches up all messages, write them into a single bag file, and
have clever mechanism to reconstruct the structure.
Turns out lots of messages can bog down the system. This does reduce load time by 2 times and disk
size by 3 times. So it's effective, but just not wildly effective. This is not exposed automatically
on the docs cause I don't feel like it's fast enough to justify that, but I couldn't just delete this.""" # RosMsgComplex
self.data = data # RosMsgComplex
def __getstate__(self): # RosMsgComplex
fn = _rosmsg_getFn2() # RosMsgComplex
with rosbag.Bag(fn, "w") as bag: # RosMsgComplex
msgs = {}; struct = _rosmsg_complex_deref_replace(self.data, k1lib.AutoIncrement(prefix="/_rosmsg_"), msgs) # RosMsgComplex
for k, v in msgs.items(): bag.write(k, v) # RosMsgComplex
with open(fn, "rb") as f: raw = f.read() # RosMsgComplex
res = {"struct": dill.dumps(struct), "raw": raw}; os.remove(fn); return res # RosMsgComplex
def __setstate__(self, d): # RosMsgComplex
fn = _rosmsg_getFn2() # RosMsgComplex
with open(fn, "wb") as f: f.write(d["raw"]) # RosMsgComplex
msgs = {x.topic:x for x in rosbag.Bag(fn).read_messages()} # RosMsgComplex
self.data = _rosmsg_complex_deref_reconstruct(d["struct"], msgs); os.remove(fn) # RosMsgComplex
[docs]class deref(BaseCli): # deref
[docs] def __init__(self, maxDepth=float("inf"), igT=True): # deref
"""Recursively converts any iterator into a list.
Example::
iter(range(5)) # returns something like "<range_iterator at 0x7fa8c52ca870>"
iter(range(5)) | deref() # returns [0, 1, 2, 3, 4]
[2, 3, yieldT, 6] | deref() # returns [2, 3], yieldT stops things early
You can also specify a ``maxDepth``::
iter([range(3)]) | deref(0) # returns something like "<list_iterator at 0x7f810cf0fdc0>"
iter([range(3)]) | deref(1) # returns [range(3)]
iter([range(3)]) | deref(2) # returns [[0, 1, 2]]
There are a few classes/types that are considered atomic, and :class:`deref`
will never try to iterate over it. If you wish to change it, do something like::
settings.cli.atomic.deref = (int, float, ...)
:param maxDepth: maximum depth to dereference. Starts at 0 for not doing anything
at all
:param igT: short for "ignore tensor". If True, then don't loop over :class:`torch.Tensor`
and :class:`numpy.ndarray` internals""" # deref
super().__init__(); self.igT = igT # deref
self.maxDepth = maxDepth; self.depth = 0 # deref
if hasTorch: self.arrayType = (torch.Tensor, np.ndarray) if k1lib.settings.startup.or_patch.numpy else torch.Tensor # deref
else: self.arrayType = (np.ndarray,) if k1lib.settings.startup.or_patch.numpy else () # deref
def _typehint(self, inp, depth=float("inf")): # deref
if depth == 0: return inp # deref
if depth == float("inf"): depth = self.maxDepth # deref
if isinstance(inp, type) and issubclass(inp, atomic.deref): return inp # deref
if isinstance(inp, tArrayTypes): # deref
if self.igT: return inp # deref
if inp.rank is None: return tList(tAny()) # deref
if inp.rank == 1: # deref
if isinstance(inp, tTensor): # deref
return tList(type(torch.tensor(3, dtype=inp.child).item())) # deref
if isinstance(inp, tNpArray): # deref
return tList(type(np.array(3, dtype=inp.child).item())) # deref
return tList(self._typehint(inp.item(), depth-1)) # deref
if isinstance(inp, tListIterSet): # deref
return tList(self._typehint(inp.child, depth-1)) # deref
if isinstance(inp, tCollection): # deref
return tCollection(*(self._typehint(e, depth-1) for e in inp.children)) # deref
return tAny() # deref
[docs] def __ror__(self, it:Iterator[Any]) -> List[Any]: # deref
if self.depth >= self.maxDepth: return it # deref
elif isinstance(it, np.number): return it.item() # deref
elif isinstance(it, atomic.deref): return it # deref
elif isinstance(it, self.arrayType): # deref
if self.igT: return it # deref
if len(it.shape) == 0: return it.item() # deref
elif isinstance(it, dict): self.depth += 1; _d = {k: self.__ror__(v) for k, v in it.items()}; self.depth -= 1; return _d # deref
elif isinstance(it, tuple): self.depth += 1; _t = tuple(self.__ror__(k) for k in it); self.depth -= 1; return _t # deref
elif isinstance(it, set): self.depth += 1; _s = set (self.__ror__(k) for k in it); self.depth -= 1; return _s # deref
elif hasRos1 and isinstance(it, genpy.message.Message): return RosMsg(it) # return rosmsg2BagMessage(it) # deref
try: iter(it) # deref
except: return it # deref
self.depth += 1; answer = [] # deref
for e in it: # deref
if e is cli.yieldT: return answer # deref
answer.append(self.__ror__(e)) # deref
self.depth -= 1; return answer # deref
[docs] def __invert__(self) -> BaseCli: # deref
"""Returns a :class:`~k1lib.cli.init.BaseCli` that makes
everything an iterator. Not entirely sure when this comes in handy, but it's
there.""" # deref
return inv_dereference(self.igT) # deref
def _jsF(self, meta): # deref
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # deref
return f"{fIdx} = ({dataIdx}) => {dataIdx}", fIdx # deref
[docs] @staticmethod # deref
def js(): # deref
"""Deref incoming object and turn them into a js object (NOT json string!).
Example::
# returns "[...Array(10).keys()]"
range(10) | deref.json()
How does it know to transpile it? Based on the dictionary at `settings.cli.kjs.jsonF`
and the object's "._jsonF" function. Say you have a custom list object, you can do
something like this::
class CustomList:
def __init__(self): ...
def _jsonF(self): return "your js string here"
Or, you can do something like this::
class CustomList: ...
settings.cli.kjs.jsonF[CustomList] = lambda obj: "your js string here"
A variety of data types are included out of the box already for common types,
view the source code of this method to check them out.""" # deref
jsonF = settings.kjs.jsonF # deref
jsonF[list] = lambda x: "[" + ", ".join([deref_js(e) for e in x]) + "]" # deref
jsonF[str] = lambda x: json.dumps(x) # deref
jsonF[tuple] = jsonF[list]; jsonF[set] = lambda x: "new Set(" + jsonF[list](x) + ")" # deref
jsonF[type(None)] = lambda x: "null" # deref
jsonF[np.ndarray] = lambda x: json.dumps(x | deref(igT=False)) # deref
if hasTorch: jsonF[torch.Tensor] = lambda x: json.dumps(x | deref(igT=False)) # deref
jsonF[type(iter(range(10)))] = lambda x: "[" + ", ".join([str(e) for e in x]) + "]" # deref
jsonF[type((x for x in range(0)))] = jsonF[list] # deref
jsonF[type({}.keys())] = jsonF[list]; jsonF[type({}.values())] = jsonF[list] # deref
jsonF[dict] = lambda x: "{" + ", ".join([f"{json.dumps(k)}: {deref_js(v)}" for k,v in x.items()]) + "}" # deref
jsonF[defaultdict] = jsonF[dict] # deref
deref.js = lambda: cli.aS(deref_js); return deref.js() # initializes at runtime, then patches deref.json() to get a faster path! # deref
def deref_js(obj): # deref_js
# only 2 special cases, perf considerations, everything else is pluggable # deref_js
if isinstance(obj, bool): return "true" if obj else "false" # deref_js
if isinstance(obj, (numbers.Number, np.number)): return str(obj) # deref_js
fn = settings.kjs.jsonF.get(type(obj), None) # deref_js
if fn: return fn(obj) # deref_js
if hasattr(obj, "_jsonF"): return obj._jsonF() # deref_js
raise Exception(f"Don't know how to transcribe object with class {type(obj)}. Either add the serialization function to `settings.cli.kjs.jsonF`, or implement the function `._jsonF()` to your custom class") # deref_js
[docs]class bindec(BaseCli): # bindec
[docs] def __init__(self, cats:List[Any], f=None): # bindec
"""Binary decodes the input.
Example::
# returns ['a', 'c']
5 | bindec("abcdef")
# returns 'a,c'
5 | bindec("abcdef", join(","))
:param cats: categories
:param f: transformation function of the selected elements. Defaulted to :class:`~k1lib.cli.conv.toList`, but others like :class:`join` is useful too""" # bindec
self.cats = cats; self.f = f or cli.toList() # bindec
[docs] def __ror__(self, it): # bindec
it = bin(int(it))[2:][::-1] # bindec
return (e for i, e in zip(it, self.cats) if i == '1') | self.f # bindec
settings.add("smooth", 10, "default smooth amount, used in utils.smooth") # bindec
[docs]class smooth(BaseCli): # smooth
[docs] def __init__(self, consecutives=None, windowing=False): # smooth
"""Smoothes out the input stream.
Literally just a shortcut for::
batched(consecutives) | toMean().all()
Example::
# returns [4.5, 14.5, 24.5]
range(30) | smooth(10) | deref()
Smoothing over :class:`torch.Tensor` or :class:`numpy.ndarray` will
be much faster::
# returns torch.Tensor with shape (2)
torch.randn(10, 3, 4) | smooth(4)
The default consecutive value is in ``settings.cli.smooth``. This
is useful if you are smoothing over multiple lists at the same
time, like this::
# can change a single smooth value temporarily here, and all sequences will be smoothed in the same way
with settings.cli.context(smooth=5):
x = list(np.linspace(-2, 2, 50))
y = x | apply(op()**2) | deref()
plt.plot(x | smooth() | deref(), y | smooth() | deref())
:param consecutives: if not defined, then used the value inside ``settings.cli.smooth``""" # smooth
n = consecutives or settings.smooth; self.b = cli.window(n) if windowing else cli.batched(n) # smooth
self.consecutives = consecutives; self.windowing = windowing # smooth
def _all_array_opt(self, it, level): return it | (self.b | cli.toMean().all()).all(level) # smooth
[docs] def __ror__(self, it): return init.dfGuard(it) | self.b | cli.toMean().all() # smooth
def _jsF(self, meta): # smooth
if self.windowing: raise Exception(f"._jsF() does not support windowing in smooth() yet") # smooth
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # smooth
return f"{fIdx} = ({dataIdx}) => {dataIdx}.smooth({cli.kjs.v(self.consecutives)})", fIdx # smooth
def _f(): pass # _f
_code = type(_f.__code__) # _f
[docs]def disassemble(f=None): # disassemble
"""Disassembles anything piped into it.
Normal usage::
def f(a, b):
return a**2 + b
# both of these print out disassembled info
f | disassemble()
disassemble(f)
# you can pass in lambdas
disassemble(lambda x: x + 3)
# or even raw code
"lambda x: x + 3" | disassemble()""" # disassemble
c = f # disassemble
if c is None: return cli.aS(disassemble) # disassemble
if isinstance(c, str): c = compile(c, "", "exec") # disassemble
try: c = c.__code__ # disassemble
except: pass # disassemble
if not isinstance(c, _code): raise RuntimeError(f"`{c}` is not a code object/function/class method/string code") # disassemble
print(f"co_argcount: {c.co_argcount}") # disassemble
print(f"co_cellvars: {c.co_cellvars}") # disassemble
print(f"co_consts: {c.co_consts}") # disassemble
print(f"co_filename: {c.co_filename}") # disassemble
print(f"co_firstlineno: {c.co_firstlineno}") # disassemble
print(f"co_flags: {c.co_flags}") # disassemble
print(f"co_freevars: {c.co_freevars}") # disassemble
print(f"co_kwonlyargcount: {c.co_kwonlyargcount}") # disassemble
print(f"co_lnotab: {c.co_lnotab | cli.apply(str) | join(' ')}") # disassemble
print(f"co_name: {c.co_name}") # disassemble
print(f"co_names: {c.co_names}") # disassemble
print(f"co_nlocals: {c.co_nlocals}") # disassemble
print(f"co_posonlyargcount: {c.co_posonlyargcount}") # disassemble
print(f"co_stacksize: {c.co_stacksize}") # disassemble
print(f"co_varnames: {c.co_varnames}") # disassemble
print(f"Disassembly:"); dis.disassemble(c) # disassemble
with k1lib.captureStdout() as out: # disassemble
c.co_consts | cli.filt(lambda x: "code" in str(type(x))) | cli.tee(lambda _: "----------------------- inner code object -----------------------\n") | cli.apply(disassemble) | cli.ignore() # disassemble
out() | cli.filt(cli.op().strip() != "") | cli.apply("|" + cli.op()) | cli.indent() | cli.stdout() # disassemble
shortName = lambda s: s.split(os.sep)[-1] # disassemble
[docs]def tree(fL=10, dL=10, depth=float("inf"), ff:Callable[[str], bool]=(lambda s: True), df:Callable[[str], bool]=(lambda s: True)): # tree
"""Recursively gets all files and folders. Output format might be a bit
strange, so this is mainly for visualization. Example::
"." | tree() | deref()
This is way less powerful and structured than clis from the module :mod:`k1lib.cli.ktree`.
Check that out. This cli is mainly for backwards compability.
:param fL: max number of file per directory included in output
:param dL: max number of child directories per directory included in output
:param depth: explore depth
:param ff: optional file filter function
:param df: optional directory filter function""" # tree
processFolders = cli.apply(lambda x: [shortName(x), x]) | cli.apply(lambda x: x | tree(fL, dL, depth-1, ff, df) if depth > 0 else [], 1) | cli.toDict() # tree
a = cli.filt(os.path.isfile) | cli.filt(ff) | cli.head(fL) | cli.apply(shortName) | cli.aS(set) # tree
b = ~cli.filt(os.path.isfile) | cli.filt(df) | cli.head(dL) | processFolders # tree
return cli.ls() | ~cli.sortF(os.path.isfile) | (a & b) # tree
[docs]class lookup(BaseCli): # lookup
[docs] def __init__(self, d:dict, col:int=None, fill=None, mode:str="error"): # lookup
"""Looks up items from a dictionary/object. Example::
d = {"a": 3, "b": 5, "c": 52}
"abcca" | lookup(d) | deref() # returns [3, 5, 52, 52, 3]
"abccad" | lookup(d) | deref() # raises Exception, as key "d" does not exist
"abccad" | lookup(d, fill="(not found)") | deref() # returns [3, 5, 52, 52, 3, '(not found)'], mode automatically switched to "fill"
"abccad" | lookup(d, mode="fill") | deref() # returns [3, 5, 52, 52, 3, None]. Do this when you really want to return None
"abccad" | lookup(d, fill=input) | deref() # returns [3, 5, 52, 52, 3, 'd'], mode automatically switched to "input"
"abccad" | lookup(d, mode="input") | deref() # returns [3, 5, 52, 52, 3, 'd'], similar to above
"abccad" | lookup(d, mode="rm") | deref() # returns [3, 5, 52, 52, 3], removing the unknown element
[range(5), "abcca"] | transpose() | lookup(d, 1) | deref() # returns [[0, 3], [1, 5], [2, 52], [3, 52], [4, 3]]
The ``mode`` param needs a little explaning. It specifies what should happen when an element is not found
within the given dictionary. There are 3 modes total:
- error: if ``.fill`` is not None, then throws an error. If ``.fill`` is specified, then this acts like mode "fill" instead
- input: returns whatever the input element is
- rm: removes (aka ignore) the element
- fill: returns the arg ``.fill``
:param d: any object that can be sliced with the inputs
:param col: if None, lookup on each row, else lookup a specific column only
:param fill: fill value for elements that are not in the provided dictionary. Explained more above
:param mode: "error", "input", "rm", "fill". Explained more above""" # lookup
self.d = d; self.col = col; self.fill = fill # lookup
if mode == "error": # override .mode so that it's backwards compatible # lookup
if fill is input: mode = "input"; fill = None # lookup
elif fill is not None: mode = "fill" # lookup
self.mode = mode; self.rmSentinel = rmSentinel = object() # lookup
if mode == "error": f = lambda e: d[e] # lookup
elif mode == "input": f = lambda e: d.get(e, e) # lookup
elif mode == "rm": f = lambda e: d.get(e, rmSentinel) # lookup
elif mode == "fill": f = lambda e: d.get(e, fill) # lookup
else: raise Exception("Invalid mode. Only 'error', 'input', 'rm' and 'fill' are allowed") # lookup
self.f = f # lookup
def fa(it, col): # lookup
if mode == "rm": return it | cli.apply(lambda e: d.get(e, rmSentinel), col) | cli.filt(lambda x: x is not rmSentinel, col) # lookup
return it | cli.apply(f, col) # lookup
self.fa = fa # lookup
def _typehint(self, inp): # lookup
t = inferType(list(self.d.values())) # lookup
if isinstance(t, tListIterSet): return tIter(t.child) # lookup
if isinstance(t, tCollection): return tIter(tLowest(*t.children)) # lookup
return tIter(tAny()) # lookup
[docs] def __ror__(self, it): # lookup
col = self.col # lookup
if hasPandas and isinstance(it, pd.DataFrame): # lookup
if col is None: it = init.dfGuard(it) # lookup
else: # lookup
f = self.f; rmSentinel = self.rmSentinel; c = [f(e) for e in it[list(it)[col]]]; it = it.replaceCol(list(it)[col], c) # lookup
return it.iloc[[i for i, e in enumerate(c) if e is not rmSentinel]] if self.mode == "rm" else it # lookup
# return pd.DataFrame({getattr(c, "name", ogName if i == col else next(genName)):c for i,c in enumerate(cols)}) # lookup
return self.fa(it, col) # lookup
def _jsF(self, meta): # lookup
if self.mode not in ("input", "rm", "fill"): raise Exception(f"lookup()._jsF() only supports modes 'input', 'rm' and 'fill'. Either specify a mode, or a default fill value") # lookup
fIdx = init._jsFAuto(); dictIdx = f"{init._jsDAuto()}_{round(time.time())}"; dataIdx = init._jsDAuto() # lookup
return f"//k1_moveOutStart\n{dictIdx} = {json.dumps(self.d)}; //k1_moveOutEnd\n{fIdx} = ({dataIdx}) => {dataIdx}.lookup({dictIdx}, {cli.kjs.v(self.col)}, {cli.kjs.v(self.fill)}, `{self.mode}`)", fIdx # lookup
_sorted = sorted # lookup
[docs]class lookupRange(BaseCli): # lookupRange
[docs] def __init__(self, ranges, col:int=None, sorted=True, fill=None, mode="error"): # lookupRange
"""Looks up values within some range.
Example::
ranges = [[2, 3, "a"], [4, 5, "b"], [6, 7, "c"]]
vs = [1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 5.5]
vs | lookupRange(ranges, mode="error") | deref() # raises an exception cause it can't find "1" in any ranges
vs | lookupRange(ranges, mode="fill") | deref() # returns [None, None, 'a', 'a', None, None, 'b', 'b', None, None]
vs | lookupRange(ranges, mode="rm") | deref() # returns ['a', 'a', 'b', 'b']
vs | lookupRange(ranges, mode="input") | deref() # returns [1, 1.5, 'a', 'a', 3, 3.5, 'b', 'b', 5, 5.5]
vs = list(zip([1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 5.5], "abcdefghij"))
vs | lookupRange(ranges, 0, mode="rm") | deref() # returns [['a', 'c'], ['a', 'd'], ['b', 'g'], ['b', 'h']]
So, ``ranges`` should be a table with 3 columns: start, stop and value. This cli will search across all ranges,
and if the input iterator has values within a single range, it will yield that range's value. The exact
comparison expression is "start <= input < stop". Internally, there're 2 implementations:
First implementation assumes the ranges are not overlapping, activated by "sorted=True". This will
assume the ranges are sorted based on the start values, then it searches for the value using binary
search. Time complexity is O(n*log(m)), where n is the input size, m is the ranges's length
Second implementation doesn't assume the ranges are not overlapping, activated by "sorted=False".
This won't sort the ranges, and searches for the value using linear search, yielding the first
range that contains the value. Time complexity is O(n*m)
See also: :class:`lookup`
:param ranges: table of size (N, 3), with each row (start, stop, value)
:param col: column to act upon
:param sorted: if True, use binary search, else use linear search. Explained more above
:param fill: if specified, and if no ranges contain the value, then yield this value instead
:param mode: explained above. See :class:`lookup` as well""" # lookupRange
try: ranges[:]; len(ranges) # lookupRange
except: ranges = ranges | deref(2) # lookupRange
if mode == "error" and fill is not None: mode = "fill" # lookupRange
if mode == "error" and fill == input: mode = "input" # lookupRange
self.ranges = ranges; self.col = col; self.sorted = sorted; self.fill = fill; self.mode = mode # lookupRange
if mode not in ("error", "rm", "fill", "input"): raise Exception(f".mode can only be 'error', 'rm', 'fill' or 'input'") # lookupRange
[docs] def __ror__(self, it): # lookupRange
ranges = self.ranges; col = self.col; fill = self.fill; mode = self.mode; sentinel = object(); it = init.dfGuard(it) # lookupRange
colIsNone = col is None; modeFill = mode == "fill"; modeRmOrError = mode == "rm" or mode == "error"; modeInput = mode == "input"; modeError = mode == "error" # lookupRange
def edit(row, value): row = list(row); row[col] = value; return row # lookupRange
if self.sorted: # lookupRange
for row in it: # lookupRange
v = row if col is None else row[col] # lookupRange
start = 0; end = len(ranges)-1; e = sentinel # lookupRange
while start <= end: # lookupRange
mid = round((start + end)/2) # lookupRange
r = ranges[mid] # lookupRange
if r[0] <= v < r[1]: e = r[2]; break # lookupRange
if v < r[0]: end = mid-1 # lookupRange
else: start = mid+1 # lookupRange
if colIsNone: # lookupRange
if modeFill: yield fill if e is sentinel else e # lookupRange
elif modeRmOrError and e is not sentinel: yield e # lookupRange
elif modeInput: yield v if e is sentinel else e # lookupRange
elif modeError: raise KeyError(f"Can't find element {v} in any ranges") # lookupRange
else: # lookupRange
if modeFill: row = list(row); row[col] = fill if e is sentinel else e; yield row # lookupRange
elif modeRmOrError and e is not sentinel: row = list(row); row[col] = e; yield row # lookupRange
elif modeInput: row = list(row); row[col] = v if e is sentinel else e; yield row # lookupRange
elif modeError: raise KeyError(f"Can't find element {v} in any ranges") # lookupRange
else: # lookupRange
for row in it: # lookupRange
v = row if col is None else row[col] # lookupRange
e = next((vv for x,y,vv in ranges if x <= v < y), sentinel) # lookupRange
if colIsNone: # lookupRange
if modeFill: yield fill if e is sentinel else e # lookupRange
elif modeRmOrError and e is not sentinel: yield e # lookupRange
elif modeInput: yield v if e is sentinel else e # lookupRange
elif modeError: raise KeyError(f"Can't find element {v} in any ranges") # lookupRange
else: # lookupRange
if modeFill: row = list(row); row[col] = fill if e is sentinel else e; yield row # lookupRange
elif modeRmOrError and e is not sentinel: row = list(row); row[col] = e; yield row # lookupRange
elif modeInput: row = list(row); row[col] = v if e is sentinel else e; yield row # lookupRange
elif modeError: raise KeyError(f"Can't find element {v} in any ranges") # lookupRange
[docs]class getitems(BaseCli): # getitems
[docs] def __init__(self, *fields, default=None): # getitems
"""Basically [input[x] for x in fields].
Example::
# returns [3, 1, '']
{"a": 1, "b": 2, "c": 3} | getitems("c", "a", "d")
""" # getitems
self.fields = fields; self.default = default # getitems
[docs] def __ror__(self, d): # getitems
ans = []; default = self.default # getitems
for f in self.fields: # getitems
try: ans.append(d[f]) # getitems
except: ans.append(default) # getitems
return ans # getitems
[docs]class backup(BaseCli): # backup
[docs] def __init__(self): # backup
"""Backs up a file/folder.
Example::
"some/folderOrFile" | backup()
"some/folderOrFile" | backup.restore()
Really straightforward. Uses bash internally to copy files recursively, so
not available on Windows.""" # backup
pass # backup
[docs] def __ror__(self, it): # backup
it = os.path.expanduser(it) # backup
None | cli.cmd(f"rm -rf '{it}.backup'") | cli.ignore() # backup
None | cli.cmd(f"cp -r '{it}' '{it}.backup'") | cli.ignore() # backup
[docs] @staticmethod # backup
def restore(): # backup
def inner(it): # backup
it = os.path.expanduser(it) # backup
None | cli.cmd(f"rm -rf '{it}'") | cli.ignore() # backup
None | cli.cmd(f"cp -r '{it}.backup' '{it}'") | cli.ignore() # backup
return cli.aS(inner) # backup
sketch_interceptor = {} # backup
[docs]class sketch(BaseCli): # sketch
_jsF_ctxIdx = None # sketch
[docs] def __init__(self, transforms:List[Callable]=[], titles:List[str]=None, im:bool=False, ncols:int=None, n:int=None, axes:int=None): # sketch
"""Convenience tool to plot multiple matplotlib plots at the same
time, while still keeping everything short and in 1 line. For this example,
we're trying to plot x^1, x^2, ..., x^8 on 2 separate plots, one left one
right. The left will have x^1 till x^4, the right will have x^5 to x^8.
How you would do this normally::
x = np.linspace(-2, 2); exps = range(1, 9)
fig, axes = plt.subplots(1, 2, figsize=(10, 4))
# simplest solution
plt.sca(axes[0]); plt.plot(x, x**1); plt.plot(x, x**2); plt.plot(x, x**3); plt.plot(x, x**4); plt.legend([1, 2, 3, 4]); plt.xlabel("x axis")
# solution using a little bit of cli
plt.sca(axes[1]); range(5, 9) | apply(lambda a: [x, x**a]) | ~apply(plt.plot) | ignore(); plt.legend([5, 6, 7, 8]); plt.xlabel("x axis")
But this is long, and I'm incredibly lazy to write it all out. So here's how
it's going to work using this cli::
# plotting the first 4 lines only, in a single plot. Should be familiar and make sense to you before moving on
exps | apply(lambda a: [x, x**a]) | batched(4) | item() | ~apply(plt.plot) | ignore()
# plotting 8 lines across 2 plots. Simplest example using sketch(). It kinda captures clis after it and use it to plot each plot
exps | apply(lambda a: [x, x**a]) | batched(4) | (sketch() | ~apply(plt.plot))
# same as above, but adding a grid and x axis label to all plots. Transformation functions can be anything you would
# put inside a normal cli (`plt` will be passed as argument): string code, op()-capture, lambda functions, other cli tools
transforms = ["x.grid(True)", op().xlabel("x axis"), lambda x: x.ylabel("y axis")]
exps | apply(lambda a: [x, x**a]) | batched(4) | (sketch(transforms) | ~apply(plt.plot))
# same as above, but adding legends. [x, x**a] will eventually be directed to ~apply(plt.plot), while f"x^{a}" will be directed to aS(plt.legend)
exps | apply(lambda a: [[x, x**a], f"x^{a}"]) | batched(4) | (sketch() | transpose() | ~apply(plt.plot) + iden() | deref() | rItem(1) | aS(plt.legend)) | deref()
Last line will generate this plot:
.. image:: ../images/probScale.png
Is it worth the extra confusion? Afterall, it just saves you 2-3 lines of
code. To me, it is worth it, because you can quickly change styles (add
a grid, make y axis log)
See also: :class:`~k1lib.cli.output.plotImgs`
Check out a gallery of more examples at `kapi/9-mpl <https://mlexps.com/kapi/9-mpl/>`_.
:param transforms: transform functions to be run when drawing every plot. ``plt`` (aka ``matplotlib.pyplot``) will be passed in
:param titles: if specified, use these titles for each plot. Kinda hacky I have to admit
:param im: if True, returns a PIL image and closes the sketch, else return nothing but still have the sketch open
:param ncols: if specified, will sketch with this number of columns
:param n: if specified, use this number of sketch instead of figuring out automatically
:param axes: if specified, forgo calculating #axes and initialization altogether and just use the provided axes""" # sketch
super().__init__(capture=True); self.titles = titles; self.im = im # sketch
self.transforms = [cli.fastF(t) for t in transforms]; self.ncols = ncols; self.n = n; self.axes = axes # sketch
[docs] def __ror__(self, it): # sketch
it = list(it); n = self.n or len(it); s = self.capturedSerial; transforms = self.transforms # sketch
ncols = self.ncols or math.ceil(n**0.5); nrows = math.ceil(n/ncols) # sketch
if self.axes: axes = self.axes # sketch
else: # sketch
fig, axes = plt.subplots(nrows, ncols, figsize=(ncols*5, nrows*4)) # sketch
if nrows*ncols == 1: axes = [axes] # sketch
if axes | cli.shape() | cli.shape(0) > 1: axes = axes.flatten() # sketch
for i, [ax, e, title] in enumerate(zip(axes, it, self.titles or ("" | cli.repeat()))): # sketch
plt.sca(ax); e | s | cli.deref() # sketch
if title: plt.title(title) # sketch
for trans in transforms: trans(plt) # sketch
if self.n is None: axes[i+1:] | cli.op().remove().all() | cli.deref(); plt.tight_layout() # sketch
if self.im: return plt.gcf() | cli.toImg() # sketch
if self.n: return axes[i+1:] # sketch
def _jsF(self, meta): # sketch
if self.n: raise Exception("sketch()._jsF() doesn't support .n parameter yet") # sketch
if self.axes: raise Exception("sketch()._jsF() doesn't support .axes parameter yet") # sketch
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); ctxIdx = init._jsDAuto() # sketch
# generate all child functions here # sketch
sketch._jsF_ctxIdx = ctxIdx # sketch
header, _fIdx, _async = k1lib.kast.asyncGuard(self.capturedSerial._jsF(meta)) # sketch
# then generate all transforms here, using a tracing compiler # sketch
tfStmts = "" # sketch
if len(self.transforms) > 0: # sketch
class Interceptor: # sketch
def __getattr__(self, attr): # sketch
if getattr(plt, attr) not in sketch_interceptor: raise Exception(f"Transpiling function `plt.{attr}` is not supported at the moment") # sketch
return lambda *args, **kwargs: sketch_interceptor[getattr(plt, attr)](*args, **kwargs) # sketch
tfStmts = tfs = self.transforms | cli.apply(cli.init.fastF) | cli.op()(Interceptor()).all() | cli.join("; ") # sketch
sketch._jsF_ctxIdx = None # sketch
return f"""\
{ctxIdx} = null;\n{header}
{fIdx} = async ({dataIdx}) => {{ // dataIdx should have
const ctx = []; // this is the object that will be sent to the rendering server!
const titles = {json.dumps(self.titles)} ?? Array({dataIdx}.length);
for (const i of [...Array({dataIdx}.length).keys()]) {{
{ctxIdx} = [];
// actually executing function and plotting function downstream
{'await ' if _async else ''}{_fIdx}({dataIdx}[i]);
if (titles[i]) {ctxIdx}.push(["title", titles[i]]);
// inject all transforms here
{tfStmts};
ctx.push({ctxIdx}); {ctxIdx} = null;
}}
// console.log(ctx);
// console.log(JSON.stringify(ctx));
const res = await (await fetch("https://local.mlexps.com/routeServer/kapi_9-mpl", {{
method: "POST",
body: JSON.stringify({{ "ctx": JSON.stringify(ctx) }}),
headers: {{ "Content-Type": "application/json" }}
}})).json()
if (res.success) {{
const base64 = res.data;
console.log("mpl fetched");
return `<img src="data:image/jpg;base64, ${{base64}}" />`
}} else {{ throw new Error(res.reason); }}
// return ctx;
}}""", fIdx # sketch
return f"{fIdx} = ({dataIdx}) => {dataIdx}.repeatFrom({cli.kjs.v(self.limit)})", fIdx # sketch
def _jsF_plt_ctxGuard(): # _jsF_plt_ctxGuard
if sketch._jsF_ctxIdx is None: raise Exception("Have to wrap any plotting operations around sketch(). So, transform your code from `data | (toJsFunc() | ~aS(plt.plot))` into `[data] | (toJsFunc() | (sketch() | ~aS(plt.plot)))`") # _jsF_plt_ctxGuard
return sketch._jsF_ctxIdx # _jsF_plt_ctxGuard
try: import matplotlib.pyplot as plt; hasMpl = True # _jsF_plt_ctxGuard
except: hasMpl = False # _jsF_plt_ctxGuard
if hasMpl: # _jsF_plt_ctxGuard
def _jsF_plt_plot(meta, c=None): # _jsF_plt_ctxGuard
fIdx = init._jsFAuto(); xIdx = init._jsDAuto(); yIdx = init._jsDAuto(); ctxIdx = _jsF_plt_ctxGuard() # _jsF_plt_ctxGuard
return f"""\
{fIdx} = ({xIdx}, {yIdx}=null) => {{
if (!{yIdx}) {{ // handle only xIdx is available case
{yIdx} = {xIdx}; {xIdx} = [...Array({yIdx}.length).keys()];
}}
{ctxIdx}.push(["plot", {xIdx}, {yIdx}]);
}}""", fIdx # _jsF_plt_ctxGuard
settings.kjs.jsF[plt.plot] = _jsF_plt_plot # _jsF_plt_ctxGuard
def _jsF_plt_title(meta): # version that passes args in js side # _jsF_plt_ctxGuard
fIdx = init._jsFAuto(); titleIdx = init._jsDAuto(); ctxIdx = _jsF_plt_ctxGuard() # _jsF_plt_ctxGuard
return f"""{fIdx} = ({titleIdx}) => {{ {ctxIdx}.push(["title", {titleIdx}]); }}""", fIdx # _jsF_plt_ctxGuard
settings.kjs.jsF[plt.title] = _jsF_plt_title # below is version that passes args in python side, returns statement, instead of (header, fIdx) like usual # _jsF_plt_ctxGuard
sketch_interceptor[plt.title] = lambda title: f"""{_jsF_plt_ctxGuard()}.push(["title", `{title}`])""" # _jsF_plt_ctxGuard
def _jsF_plt_grid(meta): # _jsF_plt_ctxGuard
fIdx = init._jsFAuto(); tfIdx = init._jsDAuto(); ctxIdx = _jsF_plt_ctxGuard() # _jsF_plt_ctxGuard
return f"""{fIdx} = ({tfIdx}) => {{ {ctxIdx}.push(["grid", {tfIdx}]); }}""", fIdx # _jsF_plt_ctxGuard
settings.kjs.jsF[plt.grid] = _jsF_plt_grid; sketch_interceptor[plt.grid] = lambda tf=True: f"""{_jsF_plt_ctxGuard()}.push(["grid", {cli.kjs.v(tf)}])""" # _jsF_plt_ctxGuard
def _jsF_plt_legend(meta, framealpha=1): # _jsF_plt_ctxGuard
fIdx = init._jsFAuto(); legendIdx = init._jsDAuto(); ctxIdx = _jsF_plt_ctxGuard() # _jsF_plt_ctxGuard
return f"""{fIdx} = ({legendIdx}) => {{ {ctxIdx}.push(["legend", {legendIdx}, {framealpha}]); }}""", fIdx # _jsF_plt_ctxGuard
settings.kjs.jsF[plt.legend] = _jsF_plt_legend; sketch_interceptor[plt.legend] = lambda legend=None, framealpha=1: f"""{_jsF_plt_ctxGuard()}.push(["legend", {cli.kjs.v(legend)}, {cli.kjs.v(framealpha)}])""" # _jsF_plt_ctxGuard
# _jsF_plt_ctxGuard
sketch_interceptor[plt.xlim] = lambda left=None, right=None: f"""{_jsF_plt_ctxGuard()}.push(["xlim", {cli.kjs.v(left)}, {cli.kjs.v(right)}])""" # _jsF_plt_ctxGuard
sketch_interceptor[plt.ylim] = lambda bottom=None, top=None: f"""{_jsF_plt_ctxGuard()}.push(["ylim", {cli.kjs.v(bottom)}, {cli.kjs.v(top)}])""" # _jsF_plt_ctxGuard
sketch_interceptor[plt.xscale] = lambda scale: f"""{_jsF_plt_ctxGuard()}.push(["xscale", {cli.kjs.v(scale)}])""" # _jsF_plt_ctxGuard
sketch_interceptor[plt.xlabel] = lambda label: f"""{_jsF_plt_ctxGuard()}.push(["xlabel", {cli.kjs.v(label)}])""" # _jsF_plt_ctxGuard
sketch_interceptor[plt.ylabel] = lambda label: f"""{_jsF_plt_ctxGuard()}.push(["ylabel", {cli.kjs.v(label)}])""" # _jsF_plt_ctxGuard
import numbers, sys; from collections import deque # _jsF_plt_ctxGuard
[docs]class syncStepper(BaseCli): # syncStepper
[docs] def __init__(self, col=0, sort=False): # syncStepper
"""Steps forward all streams at a time, yielding same results from min to max.
That's a bit vague, so let's see an example::
a = [["a", 1], ["b", 7 ], ["c", 4], ["e", 6]]
b = [["b", 5], ["c", 1 ], ["d", 3], ["f", 5]]
c = [["a", 2], ["c", -4], ["d", 9], ["e", 4]]
[a, b, c] | syncStepper() | deref() # sync-step by the 1st column
[a, b, c] | syncStepper(1, True) | deref() # sync-step by the 2nd column. Have to sort it explicitly
The first line returns this::
[[['a', 1], None, ['a', 2]],
[['b', 7], ['b', 5], None],
[['c', 4], ['c', 1], ['c', -4]],
[None, ['d', 3], ['d', 9]],
[['e', 6], None, ['e', 4]],
[None, ['f', 5], None]]
The second line returns this::
[[None, None, ['c', -4]],
[['a', 1], ['c', 1], None],
[None, None, ['a', 2]],
[None, ['d', 3], None],
[['c', 4], None, ['e', 4]],
[None, ['b', 5], None],
[['e', 6], None, None],
[['b', 7], None, None],
[None, None, ['d', 9]]]
``col`` can be None, but it's quite a strange use case::
[['a', 'b', 'c', 'e'], ['b', 'c', 'd', 'f'], ['a', 'c', 'd', 'e']] | syncStepper(None) | deref()
It returns this::
[[['a'], None, ['a']],
[['b'], ['b'], None],
[['c'], ['c'], ['c']],
[None, ['d'], ['d']],
[['e'], None, ['e']],
[None, ['f'], None]]
As you can see, for each line, it kinda yields elements with the same column. If
that element doesn't exist, it'll just put None there. This expects the input
streams are sorted at the column of interest. If they are not, specify ``sort=True``.
It has roughly the same vibe as :class:`~k1lib.cli.structural.groupBy`, in that
it groups everything by a specific column. The main difference here is that you
can sync-step them line-by-line, loading very little into memory, so you can run
this on giant datasets and not have to worry about running out of memory.
With k streams each having n elements, you should expect memory complexity to be
O(k), and the time complexity to be O(n*k^2/2). That k^2 term is kinda worrying,
but in most use cases, k is small and so k^2 can be treated as a constant
See also: :class:`~k1lib.cli.structural.latch`
:param col: column where it should compare values and merge them together. Can be None, but that would be quite a weird use case
:param sort: whether to sort the streams or not. This cli requires it, but it's
not turned on by default because it's an intensive operation""" # syncStepper
if col is None: self.col = 0; self.colPreprocess = cli.wrapList().all() # syncStepper
else: self.col = col; self.colPreprocess = cli.iden() # syncStepper
self.bank = deque(); self.sentinel = object(); self._sort = sort # syncStepper
def _append(self, stIdx1, val1, elem1): # append to bank in the correct position # syncStepper
i = 0; val2 = self.minObj # syncStepper
for i, [stIdx2, val2, elem2] in enumerate(self.bank): # syncStepper
if val1 <= val2: break # syncStepper
if val1 <= val2: self.bank.insert(i, [stIdx1, val1, elem1]) # syncStepper
else: self.bank.append([stIdx1, val1, elem1]) # syncStepper
def _yieldNext(self): # yield the next set of values # syncStepper
n = len(self.sts); res = [None]*n; last = None; hasInit = False; changed = False; bank = self.bank; sentinel = self.sentinel # syncStepper
for i, [stIdx, val, elem] in enumerate(bank): # syncStepper
if not hasInit and elem is sentinel: return res, changed # syncStepper
if last == val or not hasInit: changed = True; res[stIdx] = elem # syncStepper
elif hasInit: break # syncStepper
hasInit = True; last = val # syncStepper
while bank[0][1] == last: # popping the values off # syncStepper
stIdx, val1, elem1 = bank.popleft(); val2, elem2 = next(self.sts[stIdx]) # syncStepper
if val1 > val2: raise Exception(f"Stream {stIdx} has not been sorted yet! Please sort all streams before passing it into syncStepper") # syncStepper
self._append(stIdx, val2, elem2) # syncStepper
return res, changed # syncStepper
[docs] def __ror__(self, sts): # sts = "streams" # syncStepper
col = self.col; it = init.dfGuard(it) # syncStepper
# --------------------- All of this is just to figure out the type of the column dynamically. So painful --------------------- # syncStepper
samples, sts = sts | self.colPreprocess.all() | cli.apply(cli.peek()) | cli.transpose() | cli.cut(col) + cli.iden() | cli.apply(list) # syncStepper
if len([e for e in sts if e != []]) == 0: return # no elements to yield at all! # syncStepper
n_nums = sum([1 if isinstance(e, numbers.Number) else 0 for e in samples]) # syncStepper
n_strs = sum([1 if isinstance(e, str) else 0 for e in samples]); n = len(samples) # syncStepper
if n_nums*(n-n_nums) + n_strs*(n-n_strs) > 0: raise Exception("The requested column in some of the streams is not purely of numeric or string type, a requirement of syncStepper(). Please fix your data structure and try again.") # syncStepper
if n_nums + n_strs == 0: raise Exception("The requested column in some of the streams is not of numeric or string type, so can't compare them to sync-step them") # syncStepper
# n = 3; n_strs = 1 # syncStepper
text = n_strs > 0; self.minObj = "" if text else float("-inf"); self.maxObj = chr(sys.maxunicode) if text else float("inf"); senObj = [self.maxObj, self.sentinel] # syncStepper
# --------------------- And here's the meat of the cli --------------------- # syncStepper
sts = sts | (cli.sort(col, not text).all() if self._sort else cli.iden()) | cli.apply(lambda st: [st | cli.apply(lambda elem: [elem[col], elem]), senObj | cli.repeat()] | cli.joinStreams()) | cli.aS(list) # syncStepper
sts | cli.apply(next) | cli.insertIdColumn() | ~cli.apply(lambda idx,e: self._append(idx, *e)) | cli.ignore(); self.sts = sts # syncStepper
while True: # syncStepper
res, changed = self._yieldNext() # syncStepper
if not changed: break # syncStepper
yield res # syncStepper
[docs]class zeroes(BaseCli): # zeroes
[docs] def __init__(self, col:int=None, log=False, offset:float=0): # zeroes
"""Shift the specified column so that the first element is zero
Example::
range(13, 20) | zeroes() | deref() # returns [0, 1, 2, 3, 4, 5, 6]
range(13, 20) | zeroes(offset=5) | deref() # returns [5, 6, 7, 8, 9, 10, 11]
[2, 3, 1, 4, 7] | zeroes() | deref() # returns [0, 1, -1, 2, 5]
Assumes the first element is going to be transformed to zero, thus the last example.
This cli also has log mode, where the natural log of the values will be shifted to zero::
# returns [1.0, 1.5, 0.5, 2.0, 3.5]
[2, 3, 1, 4, 7] | zeroes(log=True) | aS(round, 2).all() | deref()
# returns [2.72, 4.08, 1.36, 5.44, 9.51]
[2, 3, 1, 4, 7] | zeroes(offset=1, log=True) | aS(round, 2).all() | deref()
This is essentially the same as dividing everything by 2, so that the first element
turns into 1. Super neat. The 2nd example is equivalent to multiplying everything by e/2.
This cli can function in a table (.col != None)::
# returns [[0, 'a'], [1, 'b'], [2, 'c'], [3, 'd'], [4, 'e'], [5, 'f'], [6, 'g']]
[[13, 'a'], [14, 'b'], [15, 'c'], [16, 'd'], [17, 'e'], [18, 'f'], [19, 'g']] | zeroes(0) | deref()
This cli can also act across multiple list of numbers::
data = [[2, 3, 1, 4, 7], [1, 4, 3, 6, 9]]
data2 = [[[2, 'b'], [3, 'c'], [1, 'a'], [4, 'd'], [7, 'g']], [[1, 'a'], [4, 'd'], [3, 'c'], [6, 'f'], [9, 'i']]]
# returns [[0, 1, -1, 2, 5], [5, 8, 7, 10, 13]]
data | ~zeroes() | deref()
# returns [[1, 2, 0, 3, 6], [6, 9, 8, 11, 14]]
data | ~zeroes(offset=1) | deref()
# returns [[1.0, 1.5, 0.5, 2.0, 3.5], [3.5, 14.0, 10.5, 21.0, 31.5]]
data | ~zeroes(log=True) | aS(round, 2).all(2) | deref()
# returns [[[0, 'b'], [1, 'c'], [-1, 'a'], [2, 'd'], [5, 'g']], [[5, 'a'], [8, 'd'], [7, 'c'], [10, 'f'], [13, 'i']]]
data2 | ~zeroes(0) | deref()
So as you can see, the offsets are adjusted so that the first element of each list
starts from the last element of the previous list
:param col: column to shift values
:param offset: custom offset of the minimum value, defaulted to zero
:param log: whether to zero it linearly or zero it logarithmically""" # zeroes
self.col = col; self.log = log; self.offset = offset; self.inverted = False # zeroes
[docs] def __invert__(self): res = zeroes(self.col, self.log, self.offset); res.inverted = True; return res # zeroes
[docs] def __ror__(self, it): # zeroes
col = self.col; log = self.log; offset = self.offset; it = init.dfGuard(it) # zeroes
if self.inverted: # zeroes
def gen(): # zeroes
currentOffset = offset # zeroes
for arr in it: # zeroes
arr = arr | zeroes(col, log, currentOffset) # zeroes
if isinstance(arr, settings.arrayTypes): # zeroes
bm = np if isinstance(arr, np.ndarray) else (torch if hasTorch and isinstance(arr, torch.Tensor) else None) # zeroes
if bm: # zeroes
if col is None: currentOffset = bm.log(arr[-1]) if log else arr[-1] # zeroes
else: currentOffset = bm.log(arr[-1][col]) if log else arr[-1][col] # zeroes
yield arr; continue # zeroes
# yes, we have to deref() them, even though perf will suffer, because let's say # zeroes
# that the user then does rItem(3), and discards elements 0, 1 and 2. Then 0, 1, 2 # zeroes
# won't be run, so element 3 won't know its offset! # zeroes
if col is None: arr = list(arr); currentOffset = math.log(arr[-1]) if log else arr[-1] # zeroes
else: arr = [list(row) for row in arr]; currentOffset = math.log(arr[-1][col]) if log else arr[-1][col] # zeroes
yield arr # zeroes
return gen() # zeroes
if isinstance(it, settings.arrayTypes): # zeroes
bm = np if isinstance(it, np.ndarray) else (torch if hasTorch and isinstance(it, torch.Tensor) else None) # zeroes
if bm: # zeroes
cloneF = np.copy if isinstance(it, np.ndarray) else torch.clone # zeroes
if log: # zeroes
if col is None: minValue = bm.log(it[0]) - offset; return bm.exp(bm.log(it) - minValue) # zeroes
else: minValue = bm.log(it[0, col]) - offset; it = cloneF(it); it[:,col] = bm.exp(bm.log(it[:,col]) - minValue); return it # zeroes
else: # zeroes
if col is None: minValue = it[0] - offset; return it - minValue # zeroes
else: minValue = it[0, col] - offset; it = cloneF(it); it[:,col] = it[:,col] - minValue; return it # zeroes
row, it = it | cli.peek() # zeroes
if it == []: return [] # zeroes
if log: # zeroes
mlog = math.log; mexp = math.exp # zeroes
if col is None: minValue = mlog(row) - offset; return (mexp(mlog(row) - minValue) for row in it) # zeroes
else: minValue = mlog(row[col]) - offset; return ([*row[:col], mexp(mlog(row[col]) - minValue), *row[col+1:]] for row in it) # zeroes
else: # zeroes
if col is None: minValue = row - offset; return (row - minValue for row in it) # zeroes
else: minValue = row[col] - offset; return ([*row[:col], row[col] - minValue, *row[col+1:]] for row in it) # zeroes
[docs]class normalize(BaseCli): # normalize
[docs] def __init__(self, col:int=None, mode:int=0): # normalize
"""Normalize the data going in.
Example::
arr = np.random.randn(100)+10
arr | normalize() # returns array with mean around 0
arr | normalize(mode=1) # returns array with mean around 0.5, min 0, max 1
arr = np.random.randn(100, 20)+10
arr | normalize(2) # returns array with 2nd (0-indexing!) column have mean around 0. Other columns not touched
arr | normalize(2, mode=1) # returns array with 2nd (0-indexing!) column have mean around 0.5
:param col: column to apply the normalization to
:param mode: 0 for ``(x - x.mean())/s.std()``, 1 for ``(x - x.min())/(x.max() - x.min())``""" # normalize
self.col = col; self.mode = mode # normalize
def _all_array_opt(self, it, level): # normalize
col = self.col; n = len(it.shape); s = slice(None, None, None) # normalize
if col is None: # normalize
# (*level, N, *rest (>0)) -> (*level, N, rest) -> (*level, N*rest) -> (*level) (this is mean & std) -> (*level, N, *rest) # normalize
if level+1 == len(it.shape): it = it[(*[s]*len(it.shape), None)]; n += 1 # normalize
b = it | cli.joinSt(n-level-2).all(level+1); c = b | cli.joinSt().all(level) # normalize
if self.mode == 0: # normalize
mean = c.mean(level)[(*[s]*level,None,None)] # normalize
std = c.std(level)[(*[s]*level,None,None)] # normalize
return ((b - mean)/std).reshape(it.shape) # normalize
else: # normalize
min_ = c.min(level)[(*[s]*level,None,None)] # normalize
max_ = c.max(level)[(*[s]*level,None,None)] # normalize
return ((b - min_)/(max_ - min_)).reshape(it.shape) # normalize
else: # normalize
# (*level, N, F, *rest (>0)) -> (*level, N, *rest) -> (*level, N, rest) -> (*level, N*rest) -> (*level) (this is mean & std) -> (*level, N, F, *rest) # normalize
a = np.copy(it) if isinstance(it, np.ndarray) else torch.clone(it); unsqueezed = False; s = slice(None, None, None) # normalize
if level+2 == len(a.shape): a = a[(*[s]*len(a.shape), None)]; unsqueezed = True; n += 1 # normalize
b = a[(*[slice(None,None,None)]*(level+1),col)] | cli.joinSt(n-level-3).all(level+1) # (*level, N, rest (>0, hence unsqueeze)) # normalize
c = b | cli.joinSt(len(b.shape)-level-1).all(level) # (*level, N*rest) # normalize
if self.mode == 0: # normalize
mean = c.mean(level)[(*[s]*level,None,None)] # normalize
std = c.std(level)[(*[s]*level,None,None)] # normalize
b[:] = (b - mean)/std # normalize
else: # normalize
min_ = c.min(level)[(*[s]*level,None,None)] # normalize
max_ = c.max(level)[(*[s]*level,None,None)] # normalize
b[:] = (b - min_)/(max_ - min_) # normalize
return (a | cli.joinSt().all(len(a.shape)-2)) if unsqueezed else a # normalize
return NotImplemented # normalize
[docs] def __ror__(self, x): # normalize
col = self.col; mode = self.mode; x = init.dfGuard(x) # normalize
if isinstance(x, k1lib.settings.cli.arrayTypes): # normalize
dims = len(x.shape) # normalize
if col is None: return (x - x.mean())/x.std() if mode == 0 else (x - x.min())/(x.max() - x.min()) # normalize
else: # normalize
if mode == 0: xc = x[:,col]; x[:,col] = (xc - xc.mean())/xc.std(); return x # normalize
else: xc = x[:,col]; x[:,col] = (xc - xc.min())/(xc.max() - xc.min()); return x # normalize
if col is None: return np.array(list(x)) | self # normalize
else: # normalize
it = x; ans = []; it = it | cli.deref(2) # normalize
if len(it) == 0: return [] # normalize
if mode == 0: # normalize
mean = [row[col] for row in it] | cli.toMean() # normalize
std = [row[col] for row in it] | cli.toStd() # normalize
for row in it: row[col] = (row[col]-mean)/std; ans.append(row) # normalize
else: # normalize
_min = min([row[col] for row in it]) # normalize
_max = max([row[col] for row in it]) # normalize
for row in it: row[col] = (row[col]-_min)/(_max-_min); ans.append(row) # normalize
return ans # normalize
[docs]class branch(BaseCli): # branch
[docs] def __init__(self, f, f1, f2): # branch
"""Works like an if statement, for when you don't want to make a separate
function as it's too time consuming.
3 | branch(lambda x: x>2, lambda x: x+4, lambda x: x+5) # returns 7
3 | branch(op()>2, op()+4, op()+5) # returns 7
3 | branch("x>2", "x+4", "x+5") # returns 7
3 | aS(lambda x: (x + 4) if (x > 2) else (x + 5)) # returns 7
So all of them kinda does the same thing as the 4th line. Is it worth it? Debatable, but I've
had so many times that I have to wrap things in parenthesis around expressions to make sure
it's not doing anything weird and that takes long enough to disrupt my thought process
that I kinda have to make this
:param f: predicate function. If returns True, use the first function (f1), else use the second function (f2)""" # branch
self.f = f; self._fC = cli.fastF(f) # branch
self.f1 = f1; self._fC1 = cli.fastF(f1) # branch
self.f2 = f2; self._fC2 = cli.fastF(f2) # branch
[docs] def __ror__(self, it): return self._fC1(it) if self._fC(it) else self._fC2(it) # branch