This is for functions that sort of changes the table
structure in a dramatic way. They're the core transformations
from typing import List, Union, Iterator, Callable, Any, Tuple, Dict
from collections import defaultdict, Counter, deque
from k1lib.cli.init import patchDefaultDelim, BaseCli, oneToMany, fastF, yieldT
import k1lib.cli as cli; import k1lib.cli.init as init; from k1lib.cli.typehint import *
import itertools, numpy as np, k1lib, math, json; plt = k1lib.dep.plt
try: import torch; hasTorch = True
except: torch = k1lib.Object().withAutoDeclare(lambda: type("RandomClass", (object, ), {})); hasTorch = True
try: import pandas as pd; pd.core; hasPandas = True
except: hasPandas = False
__all__ = ["transpose", "T", "reshape", "insert", "splitW", "splitC",
"joinStreams", "joinSt", "joinStreamsRandom", "activeSamples",
"table", "batched", "batchedTrigger", "window", "groupBy", "ungroup",
"insertColumn", "insertIdColumn", "insId",
"count", "hist", "permute", "AA_", "peek", "peekF",
"repeat", "repeatF", "repeatFrom", "oneHot", "latch"]
settings = k1lib.settings.cli
[docs]class transpose(BaseCli): # transpose
[docs] def __init__(self, dim1:int=0, dim2:int=1, fill=None): # transpose
"""Join multiple columns and loop through all rows. Aka transpose.
# returns [[1, 4], [2, 5], [3, 6]]
[[1, 2, 3], [4, 5, 6]] | transpose() | deref()
# returns [[1, 4], [2, 5], [3, 6], [0, 7]]
[[1, 2, 3], [4, 5, 6, 7]] | transpose(fill=0) | deref()
Multidimensional transpose works just like :meth:`torch.transpose` too::
# returns (2, 7, 5, 3), but detected Tensor, so it will use builtin :meth:`torch.transpose`
torch.randn(2, 3, 5, 7) | transpose(3, 1) | shape()
# also returns (2, 7, 5, 3), but actually does every required computation. Can be slow if shape is huge
torch.randn(2, 3, 5, 7) | deref(igT=False) | transpose(3, 1) | shape()
Can also work with numpy arrays::
# returns (5, 3, 2)
np.random.randn(2, 3, 5) | transpose(0, 2) | op().shape
Be careful with infinite streams, as transposing stream of shape (inf, 5) will
hang this operation! Either don't do it, or temporarily limit all infinite streams like
with settings.cli.context(inf=21):
# returns (3, 21)
[2, 1, 3] | repeat() | transpose() | shape()
Also be careful with empty streams, as you might not get any results at all::
# returns [], as the last stream has no elements
[[1, 2], [3, 4], []] | transpose() | deref()
# returns [[1, 3, 0], [2, 4, 0]]
[[1, 2], [3, 4], []] | transpose(fill=0) | deref()
This also has an alias of ``T()``, so you can make it shorter, like ``np.random.randn(2, 3, 5) | T()``
:param fill: if not None, then will try to zip longest with this fill value""" # transpose
super().__init__(); self.fill = fill # transpose
self.d1 = min(dim1, dim2); self.d2 = max(dim1, dim2) # transpose
self.normal = self.d1 == 0 and self.d2 == 1 # transpose
def _all_array_opt(self, it, level:int): return it | transpose(self.d1 + level, self.d2 + level, self.fill) # transpose
def _typehint(self, inp): # transpose
if isinstance(inp, tArrayTypes): # transpose
if inp.rank is None: return inp # transpose
if inp.rank > max(self.d1, self.d2): return inp # transpose
else: return tAny() # this case doesn't quite exist # transpose
if self.d1 == 0 and self.d2 == 1: # transpose
if isinstance(inp, tListIterSet): # transpose
if isinstance(inp.child, tListIterSet): # transpose
return tIter(tList(inp.child.child)) # transpose
return tAny() # transpose
[docs] def __ror__(self, it): # transpose
d1 = self.d1; d2 = self.d2; fill = self.fill # transpose
if isinstance(it, torch.Tensor): return it.transpose(d1, d2) # transpose
if isinstance(it, np.ndarray): # transpose
dims = list(range(len(it.shape))) # transpose
temp = dims[d1]; dims[d1] = dims[d2]; dims[d2] = temp # transpose
return it.transpose(dims) # transpose
if hasPandas and isinstance(it, pd.core.frame.DataFrame): # transpose
if d1 != 0 or d2 != 1: raise Exception("Can't do multidimensonal transpose on a data frame, as that makes no sense") # transpose
return [it[k] for k in it] # transpose
if d1 != 0 or d2 != 1: return it | cli.serial(*([transpose(fill=fill).all(i) for i in range(d1, d2)] + [transpose(fill=fill).all(i-1) for i in range(d2-1, d1, -1)])) # transpose
if hasPandas: # transpose
try: # transpose
it[:]; len(it) # transpose
if sum(isinstance(x, pd.core.frame.Series) for x in it[:5]) >= 2: # list of series, so can turn it into a dataframe! # transpose
s = set([x for x in [getattr(x, "name", None) for x in it] if x]) # transpose
def gen(): # transpose
autoInc = k1lib.AutoIncrement(prefix="col_") # transpose
while True: # transpose
e = autoInc() # transpose
if e not in s: yield e # transpose
nameGen = gen(); return pd.DataFrame({getattr(e, "name", next(nameGen)):e for e in it}) # transpose
except: pass # transpose
if self.fill is None: return zip(*it) # transpose
else: return itertools.zip_longest(*it, fillvalue=fill) # transpose
[docs] @staticmethod # transpose
def fill(fill="", dim1:int=0, dim2:int=1): # transpose
"""Convenience method to fill in missing elements of a table.
# returns [[1, 2, 3], [4, 5, 0]]
[[1, 2, 3], [4, 5]] | transpose.fill(0) | deref()
# also returns [[1, 2, 3], [4, 5, 0]], demonstrating how it works underneath
[[1, 2, 3], [4, 5]] | transpose(fill=0) | transpose(fill=0) | deref()""" # transpose
return transpose(dim1, dim2, fill=fill) | transpose(dim1, dim2, fill=fill) # transpose
[docs] @staticmethod # transpose
def wrap(f, dim1:int=0, dim2:int=1, fill=None): # transpose
"""Wraps ``f`` around 2 :class:`transpose`, can be useful in combination with
:class:`k1lib.cli.init.mtmS`. Example::
# returns [[1, 4, 3, 4], [8, 81, 10, 11]]
[range(1, 5), range(8, 12)] | transpose.wrap(mtmS.f(apply(op()**2), 1)) | deref()
# also returns [[1, 4, 3, 4], [8, 81, 10, 11]], demonstrating the typical way to do this
[range(1, 5), range(8, 12)] | apply(op()**2, 1) | deref()
The example given is sort of to demonstrate this only. Most of the time, just use
:class:`~k1lib.cli.modifier.apply` with columns instead. But sometimes you need direct
access to a column, so this is how you can do it.""" # transpose
if not isinstance(f, BaseCli): f = cli.applyS(f) # transpose
return transpose(dim1, dim2, fill) | f | transpose(dim1, dim2, fill) # transpose
def _jsF(self, meta): # transpose
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # transpose
if self.d1 != 0 or self.d2 != 1: raise Exception(f"transpose._jsF() doesn't allow complex transpose across many dimensions yet") # transpose
return f"{fIdx} = ({dataIdx}) => {dataIdx}.transpose({cli.kjs.v(self.fill)})", fIdx # transpose
#tOpt.clearPasses() # transpose
def oTranspose1(cs, ts, metadata): # `transpose() | transpose().all()` to `transpose() | apply(aS(torch.stack) | transpose())` # oTranspose1
tr, ap = cs; t = ts[0] # oTranspose1
if (not ap.normal) or (not tr.normal): return None # oTranspose1
if (not isinstance(ap.f, transpose)) or (not ap.f.normal): return None # oTranspose1
if isinstance(t, tListIterSet) and (isinstance(t.child, tTensor) or isinstance(t.child, tCollection) and isinstance(tLowest(*t.child.children), tTensor)): # oTranspose1
return [transpose(), cli.apply(cli.aS(torch.stack) | transpose())] # oTranspose1
tOpt.addPass(oTranspose1, [transpose, cli.apply]) # oTranspose1
T = transpose # oTranspose1
def _formStructure(it, dims, dimI): # _formStructure
if dimI >= len(dims): return next(it) # _formStructure
return [_formStructure(it, dims, dimI+1) for i in range(dims[dimI])] # _formStructure
[docs]class reshape(BaseCli): # reshape
[docs] def __init__(self, *dims): # reshape
"""Reshapes the input stream into the desired shape.
# returns [[0, 1, 2], [3, 4, 5]]
range(6) | reshape(2, 3) | deref()
# returns [[0, 1], [2, 3], [4, 5]]
range(6) | reshape(3, 2) | deref()
# returns [[0, 1], [2, 3], [4, 5]], stopped early
range(100) | reshape(3, 2) | deref()
# returns [[0, 1, 2], [3, 4, 5]], can leave out first dimension
range(6) | reshape(-1, 3) | deref()
# returns [[0, 1, 2]], won't include 2nd element, as it ran out of elements
range(5) | reshape(-1, 3) | deref()
# throws error, as it ran out of elements and can't fulfill the request
range(6) | reshape(3, 3) | deref()
Unlike :meth:`torch.reshape`, the input piped into this has to be a simple iterator.
If you have a complex data structure with multiple dimensions, turn that into a simple
iterator with :class:`joinStreams` first, like this::
# returns [[[0, 1, 2]], [[3, 4, 5]]]
[[[0], [1]], [[2], [3]], [[4], [5]]] | joinStreams(2) | reshape(2, 1, 3) | deref()""" # reshape
self.dims = dims # reshape
[docs] def __ror__(self, it): # reshape
it = iter(init.dfGuard(it)) # reshape
if self.dims[0] == -1: # reshape
try: # reshape
while True: yield _formStructure(it, self.dims, 1) # reshape
except StopIteration: pass # reshape
else: # reshape
for i in range(self.dims[0]): yield _formStructure(it, self.dims, 1) # reshape
[docs]class insert(BaseCli): # insert
[docs] def __init__(self, element, begin=True): # insert
"""Join element into list.
# returns [5, 2, 6, 8]
[2, 6, 8] | insert(5) | deref()
# returns [2, 6, 8, 5]
[2, 6, 8] | insert(5, begin=False) | deref()
# returns [[3, 1], 2, 6, 8]
[2, 6, 8] | insert([3, 1]) | deref()
:param element: the element to insert""" # insert
super().__init__(); self.element = element; self.begin = begin; self.expand = False # insert
def _all_array_opt(self, it, level): # insert
element = self.element # insert
if (it | cli.shape())[level+1:] != (element | cli.shape()): return NotImplemented # insert
e = (element | (cli.toNdArray(it.dtype) if isinstance(it, np.ndarray) else cli.toTensor(it.dtype)))[(*[None]*(level+1),)] # insert
for i in range(level): e = e | cli.repeatFrom(it.shape[i]).all(i) # insert
return (np if isinstance(it, np.ndarray) else torch).concatenate([e, it] if self.begin else [it, e], level) # insert
[docs] def __ror__(self, it): # insert
element = self.element; begin = self.begin # insert
if isinstance(it, settings.arrayTypes): # insert
if (it | cli.shape())[1:] == (element | cli.shape()): # insert
if isinstance(it, np.ndarray): # insert
b = (element | cli.toNdArray())[None] # insert
return np.concatenate([b, it] if begin else [it, b], axis=0) # insert
else: # insert
b = (element | cli.toTensor())[None] # insert
return torch.concatenate([b, it] if begin else [it, b], axis=0) # insert
if hasPandas and isinstance(it, pd.core.frame.DataFrame): # insert
try: # insert
newE = pd.DataFrame({c:[e] for c,e in zip(list(it), self.element)}) # insert
return pd.concat([newE, it] if self.begin else [it, newE], ignore_index=True) # insert
except: it = init.dfGuard(it) # insert
if hasPandas and isinstance(it, pd.core.frame.Series): # insert
try: return pd.concat([pd.Series([self.element]), it], ignore_index=True) # insert
except: pass # insert
try: it[:]; len(it); return [self.element, *it] if begin else [*it, self.element] # if input is sliceable, then it must mean it's the output of some process that's not very heavy, so let's return a list, instead of going through an interator # insert
except: pass # insert
def gen(): # insert
_it = iter(it) # insert
if begin: yield element; yield from _it # insert
else: yield from _it; yield element # insert
return gen() # insert
def _jsF(self, meta): # insert
fIdx = init._jsFAuto(); elemIdx = init._jsDAuto(); dataIdx = init._jsDAuto() # insert
return f"{elemIdx} = {json.dumps(self.element)}; {fIdx} = ({dataIdx}) => {dataIdx}.insert({elemIdx}, {cli.kjs.v(self.begin)})", fIdx # insert
[docs]class splitW(BaseCli): # splitW
[docs] def __init__(self, *weights:List[float]): # splitW
"""Splits elements into multiple weighted lists. If no weights are provided,
then automatically defaults to [0.8, 0.2]. Example::
# returns [[0, 1, 2, 3, 4, 5, 6, 7], [8, 9]]
range(10) | splitW(0.8, 0.2) | deref()
# same as the above
range(10) | splitW() | deref()
This also works with array types::
torch.randn(100, 3) | splitW() # returns 2 tensors with shapes (80, 3) and (20, 3)
See also: :class:`splitC`""" # splitW
super().__init__(); # splitW
if len(weights) == 0: weights = [0.8, 0.2] # splitW
self.weights = np.array(weights) # splitW
[docs] def __ror__(self, it): # splitW
try: it[:]; len(it) # splitW
except: it = list(it) # splitW
ws = self.weights; c = 0; ws = (ws * len(it) / ws.sum()).astype(int) # splitW
if isinstance(it, settings.arrayTypes) or (hasPandas and isinstance(it, pd.core.arraylike.OpsMixin)): # splitW
ans = [] # splitW
for w in ws[:-1]: ans.append(it[c:c+w]); c += w # splitW
ans.append(it[c:]); return ans # splitW
def gen(): # splitW
c = 0 # splitW
for w in ws[:-1]: yield it[c:c+w]; c += w # splitW
yield it[c:] # splitW
return gen() # splitW
def _jsF(self, meta): # splitW
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # splitW
return f"{fIdx} = ({dataIdx}) => {dataIdx}.splitW({cli.kjs.vs(self.weights) | cli.join(', ')})", fIdx # splitW
[docs]class splitC(BaseCli): # splitC
[docs] def __init__(self, *checkpoints:List[float]): # splitC
"""Splits elements into multiple checkpoint-delimited lists.
# returns [[0, 1], [2, 3, 4], [5, 6, 7, 8, 9]]
range(10) | splitC(2, 5) | deref()
# returns ['01', '234', '56789']
"0123456789" | splitC(2, 5) | deref()
Here, you're specifying 2 checkpoints, 2 and 5, so it will split
the list up into 3 sections. First section is 0-2, second section
is 2-5, third section is 5-end. You can pass in fractional
checkpoints too::
# returns [[0, 1], [2, 3, 4, 5], [6, 7, 8, 9]]
range(10) | splitC(0.2, 0.6) | deref()
This cli might be unintuitive to remember, so if you want to just
split it up into 2 parts, check out :meth:`~k1lib.cli.filt.head.split`.
If you want to split things up by weighted length, check
out :class:`splitW`""" # splitC
self.checkpoints = checkpoints | cli.aS(np.array) # splitC
self.intMode = checkpoints | cli.apply(lambda x: int(x) == x) | cli.aS(all) # splitC
[docs] def __ror__(self, it): # splitC
try: it[:]; len(it) # splitC
except: it = list(it) # splitC
cs = self.checkpoints # splitC
if not self.intMode: cs = (cs * len(it)).astype(int) # splitC
cs = sorted(cs) # splitC
if isinstance(it, settings.arrayTypes) or (hasPandas and isinstance(it, pd.core.arraylike.OpsMixin)): # splitC
ans = [it[:cs[0]]] # splitC
for i in range(len(cs)-1): ans.append(it[cs[i]:cs[i+1]]) # splitC
ans.append(it[cs[-1]:]); return ans # splitC
def gen(): # splitC
yield it[:cs[0]] # splitC
for i in range(len(cs)-1): yield it[cs[i]:cs[i+1]] # splitC
yield it[cs[-1]:] # splitC
return gen() # splitC
def _jsF(self, meta): # splitC
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # splitC
return f"{fIdx} = ({dataIdx}) => {dataIdx}.splitC({cli.kjs.vs(self.checkpoints) | cli.join(', ')})", fIdx # splitC
[docs]class joinStreams(BaseCli): # joinStreams
[docs] def __init__(self, dims=1): # joinStreams
"""Joins multiple streams.
# returns [1, 2, 3, 4, 5]
[[1, 2, 3], [4, 5]] | joinStreams() | deref()
# returns [[0, 1], [2], [3, 4, 5], [6, 7, 8], [], [9, 10]]
[[[0, 1], [2], [3, 4, 5]], [[6, 7, 8], [], [9, 10]]] | joinStreams() | deref()
# returns [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[[[0, 1], [2], [3, 4, 5]], [[6, 7, 8], [], [9, 10]]] | joinStreams(2) | deref()
If you pass in :class:`numpy.ndarray` or :class:`torch.Tensor`, then it will
automatically use the C-accelerated version, like this::
# returns Tensor with shape (6, 4)
torch.randn(2, 3, 4) | joinStreams()
# returns array with shape (6, 4)
np.random.randn(2, 3, 4) | joinStreams()
Sometimes, you may want to impose some dimensional structure after joining all streams
together, which :class:`reshape` does.
If "joinStreams" is too unintuitive to remember, there's also an alias called
:param dims: how many ``joinStreams()`` do you want to do consecutively?""" # joinStreams
if dims < 0: raise AttributeError(f"`dims` ({dims}) can't be less than 0, as it doesn't make any sense!") # joinStreams
self.dims = dims; self.multi = cli.serial(*(joinStreams() for d in range(dims))) if dims > 1 else None # joinStreams
def _all_array_opt(self, it, level:int): # joinStreams
if self.dims == 0: return it # joinStreams
sh = tuple(it.shape) # joinStreams
if len(sh) < level+self.dims+1: raise init.ArrayOptException(f"joinSt({self.dims}).all({level}) can't be applied to an array of shape {sh}") # joinStreams
return it.reshape([*sh[:level], sh[level:level+self.dims+1] | cli.toProd(), *sh[level+self.dims+1:]]) # joinStreams
[docs] def __ror__(self, streams:Iterator[Iterator[Any]]) -> Iterator[Any]: # joinStreams
if isinstance(streams, settings.arrayTypes): return self._all_array_opt(streams, 0) # joinStreams
if self.multi != None: return streams | self.multi # joinStreams
elif self.dims == 1: # joinStreams
def gen(): # joinStreams
for stream in init.dfGuard(streams): yield from stream # joinStreams
return gen() # joinStreams
else: return streams # joinStreams
def _jsF(self, meta): # joinStreams
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # joinStreams
return f"{fIdx} = ({dataIdx}) => {dataIdx}.joinSt({cli.kjs.v(self.dims)})", fIdx # joinStreams
joinSt = joinStreams # joinStreams
def probScale(ps, t): # t from 0 -> 1, for typical usage # probScale
l = np.log(ps); avg = l.mean() # probScale
a = (l-avg)*t+avg; a -= a.max() # probScale
ans = np.exp(a); return ans/ans.sum() # probScale
import random # probScale
def rand(n, ps=None): # rand
if ps is None: # rand
while True: yield random.randrange(n) # rand
else: # rand
while True: yield from np.random.choice(n, size=100, p=ps) # rand
[docs]class joinStreamsRandom(BaseCli): # joinStreamsRandom
[docs] def __init__(self, alpha=0, ps=None): # joinStreamsRandom
"""Join multiple streams randomly. If any streams runs out, then quits. If
any stream yields :data:`~k1lib.cli.init.yieldT`, then just ignores that result and
continue. Could be useful in active learning. Example::
# could return [0, 1, 10, 2, 11, 12, 13, ...], with max length 20, typical length 18
[range(0, 10), range(10, 20)] | joinStreamsRandom() | deref()
stream2 = [[-5, yieldT, -4, -3], yieldT | repeat()] | joinStreams()
# could return [-5, -4, 0, -3, 1, 2, 3, 4, 5, 6], demonstrating yieldT
[range(7), stream2] | joinStreamsRandom() | deref()
By default, all streams are treated equally, and are yielded with equal probabilities.
However, you can tweak these probabilities a little bit, to your liking. This is
controlled by the parameter ``alpha``:
.. image:: ../images/probScale.png
If ``alpha`` is 0, then all probabilities will be the same. If ``alpha`` is 1,
then all probabilities are proportional to the length of the input stream. The
original intention was to vary ``alpha`` just from 0 to 1, but it can actually
be of any number::
[range(0, 10), range(10, 100)] | joinStreamsRandom(0) | shape(0) # returns around 21, because it favors both streams equally
[range(0, 10), range(10, 100)] | joinStreamsRandom(1) | shape(0) # returns around 90, because it favors the second array 9x more
[range(0, 10), range(10, 100)] | joinStreamsRandom(100) | shape(0) # returns 90, because it highly favors the second array
[range(0, 10), range(10, 100)] | joinStreamsRandom(-100) | shape(0) # returns 10, because it highly favors the first array
:param alpha: if not zero, does a weighted joining, instead of totally uniform probability
:param ps: if specified, use these probabilities, else try to determine from the lengths of the input streams""" # joinStreamsRandom
super().__init__(); self.alpha = alpha; self.ps = ps # joinStreamsRandom
[docs] def __ror__(self, streams:Iterator[Iterator[Any]]) -> Iterator[Any]: # joinStreamsRandom
alpha = self.alpha; ps = self.ps; streams = list(init.dfGuard(streams)); nStreams = len(streams) # joinStreamsRandom
if alpha != 0: # joinStreamsRandom
if ps is None: # joinStreamsRandom
try: ps = np.array([len(st) for st in streams]) # joinStreamsRandom
except: # joinStreamsRandom
streams = [list(st) for st in streams] # joinStreamsRandom
ps = np.array([len(st) for st in streams]) # joinStreamsRandom
else: ps = np.array(list(ps))*1.0 # joinStreamsRandom
else: ps = np.array([1/nStreams]*nStreams) # joinStreamsRandom
ps = probScale(ps/ps.sum(), alpha) # joinStreamsRandom
streams = [iter(st) for st in streams] # joinStreamsRandom
try: # joinStreamsRandom
for streamIdx in rand(len(streams), ps): # joinStreamsRandom
o = next(streams[streamIdx]) # joinStreamsRandom
if not o is yieldT: yield o # "not is" to fix numpy `==` # joinStreamsRandom
except StopIteration: pass # joinStreamsRandom
def _jsF(self, meta): # joinStreamsRandom
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # joinStreamsRandom
if self.alpha != 0 and not self.ps is None: raise Exception("joinStreamsRandom._jsF() doesn't allow custom alpha and ps yet") # joinStreamsRandom
return f"{fIdx} = ({dataIdx}) => {dataIdx}.joinStreamsRandom()", fIdx # joinStreamsRandom
[docs]class activeSamples(BaseCli): # activeSamples
[docs] def __init__(self, limit:int=100, p:float=0.95): # activeSamples
"""Yields active learning samples.
o = activeSamples()
ds = range(10) # normal dataset
ds = [o, ds] | joinStreamsRandom() # dataset with active learning capability
next(ds) # returns 0
next(ds) # returns 1
next(ds) # returns 2
next(ds) # can return 3 or 20
next(ds) # can return (4 or 20) or 4
So the point of this is to be a generator of samples. You can define your dataset
as a mix of active learning samples and standard samples. Whenever there's a data
point that you want to focus on, you can add it to ``o`` and it will eventially yield
.. warning::
It might not be a good idea to set param ``limit`` to higher numbers than
100. This is because, the network might still not understand a wrong sample
after being shown multiple times, and will keep adding that wrong sample
back in, distracting it from other samples, and reduce network's accuracy
after removing active learning from it.
If ``limit`` is low enough (from my testing, 30-100 should be fine), then
old wrong samples will be kicked out, allowing for a fresh stream of wrong
samples coming in, and preventing the problem above. If you found that
removing active learning makes the accuracy drops dramatically, then try
decreasing the limit.
:param limit: max number of active samples. Discards samples if number of samples
is over this.
:param p: probability of actually adding the samples in""" # activeSamples
super().__init__(); self.p = p # activeSamples
self.samples = deque([], limit) # activeSamples
[docs] def append(self, item): # activeSamples
"""Adds 1 sample.""" # activeSamples
if random.random() < self.p: self.samples.append(item) # activeSamples
[docs] def extend(self, items): # activeSamples
"""Adds multiple samples.""" # activeSamples
for item in items: self.append(item) # activeSamples
def __iter__(self): # activeSamples
samples = self.samples # activeSamples
while True: # activeSamples
if len(samples) == 0: yield yieldT # activeSamples
else: yield samples.popleft() # activeSamples
[docs]def table(delim:str=None): # table
"""Basically ``op().split(delim).all()``. This exists because this is used
quite a lot in bioinformatics. Example::
# returns [['a', 'bd'], ['1', '2', '3']]
["a|bd", "1|2|3"] | table("|") | deref()""" # table
return cli.op().split(patchDefaultDelim(delim)).all() # table
def _batch(it, bs, includeLast, incl): # _batch
l = []; it = iter(it); sentinel = object(); last = sentinel # _batch
try: # _batch
if incl: # _batch
while True: # _batch
if last is not sentinel: l.append(last) # _batch
for i in range(bs): l.append(next(it)) # _batch
last = next(it); l.append(last) # _batch
yield l; l = [] # _batch
else: # _batch
while True: # _batch
for i in range(bs): l.append(next(it)) # _batch
yield l; l = [] # _batch
pass # _batch
except StopIteration: # _batch
if includeLast and len(l) > 0: yield l # _batch
def _batchRange(it, bs, includeLast, incl): # _batchRange
start, stop, step = it.start, it.stop, it.step # _batchRange
lastCur = start; cur = lastCur + bs*step # _batchRange
if incl: # _batchRange
while cur <= stop: yield range(lastCur, min(cur+1, stop), step); lastCur = cur; cur += bs*step # _batchRange
else: # _batchRange
while cur <= stop: yield range(lastCur, cur, step); lastCur = cur; cur += bs*step # _batchRange
if includeLast and lastCur < stop: yield range(lastCur, stop, step) # _batchRange
def _batchSliceable(it, bs, includeLast, incl): # _batchSliceable
cur = 0; n = len(it) # _batchSliceable
if incl: # _batchSliceable
while (cur+1)*bs <= n: yield it[cur*bs:(cur+1)*bs+1]; cur += 1 # _batchSliceable
else: # _batchSliceable
while (cur+1)*bs <= n: yield it[cur*bs:(cur+1)*bs]; cur += 1 # _batchSliceable
if includeLast: # _batchSliceable
ans = it[cur*bs:(cur+1)*bs] # _batchSliceable
if len(ans) > 0: yield ans # _batchSliceable
[docs]class batched(BaseCli): # batched
[docs] def __init__(self, bs=32, includeLast=False, incl=False): # batched
"""Batches the input stream.
# returns [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
range(11) | batched(3) | deref()
# returns [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]]
range(11) | batched(3, True) | deref()
# returns [[0, 1, 2, 3, 4]]
range(5) | batched(float("inf"), True) | deref()
# returns []
range(5) | batched(float("inf"), False) | deref()
# returns [[0, 1, 2, 3], [3, 4, 5, 6], [6, 7, 8, 9]], includes the first element of the next batch!
range(11) | batched(3, incl=True) | deref()
# returns [[0, 1, 2, 3], [3, 4, 5, 6], [6, 7, 8, 9], [9, 10]]
range(11) | batched(3, True, incl=True) | deref()
Can work well and fast with :class:`torch.Tensor` and :class:`numpy.ndarray`::
# both returns torch.Tensor of shape (2, 3, 4, 5)
torch.randn(6, 4, 5) | batched(3)
torch.randn(7, 4, 5) | batched(3)
Also, if input is a :class:`range`, then to save time, a bunch of other
ranges will be returned, instead of a bunch of lists, for performance::
# returns [range(0, 3), range(3, 6), range(6, 9)]
range(11) | batched(3) | toList()
See also: :class:`window`, :class:`batchedTrigger`
:param bs: batch size
:param includeLast: whether to include the last incomplete batch or not
:param incl: whether to have inclusive bounds or not""" # batched
super().__init__(); self.bs = bs; self.includeLast = includeLast; self.incl = incl # batched
def _all_array_opt(self, it, level): # batched
if self.includeLast: return NotImplemented # batched
bs = self.bs; a = [slice(None, None, None)]*level; n = it.shape[level] // bs; it = it[(*a, slice(None, n*bs))] # batched
return it.reshape(*it.shape[:level], n, bs, *it.shape[level+1:]) # batched
[docs] def __ror__(self, it): # batched
bs = self.bs; includeLast = self.includeLast; incl = self.incl # batched
if bs == float("inf"): return [it] if includeLast else [] # batched
if not incl and isinstance(it, k1lib.settings.cli.arrayTypes): # batched
if (not includeLast) or (it.shape[0]%bs == 0): n = it.shape[0] // bs; it = it[:n*bs]; return it.reshape(n, bs, *it.shape[1:]) # batched
if not incl and isinstance(it, range): return _batchRange(it, bs, includeLast, incl) # batched
try: it[:]; len(it); return _batchSliceable(it, bs, includeLast, incl) # batched
except: return _batch(it, bs, includeLast, incl) # batched
def _jsF(self, meta): # batched
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # batched
return f"{fIdx} = ({dataIdx}) => {dataIdx}.batched({cli.kjs.v(self.bs)}, {cli.kjs.v(self.includeLast)})", fIdx # batched
[docs]class batchedTrigger(BaseCli): # batched the data using a trigger column # batchedTrigger
[docs] def __init__(self, col:int=None, value:Any=None, delta:float=None, adj:bool=True): # value is to only yield segments that have the specified value # batchedTrigger
"""Like :class:`batched`, will batch rows/elements up but based on
a trigger value instead. See :class:`~k1lib.cli.filt.trigger` for more
discussion. Normal mode::
data = [[1, "a"], [1, "b"], [2, "c"], [3, "d"], [3, "e"], [1, "f"]]
# returns [[1, 1], [2], [3, 3], [1]]
[1, 1, 2, 3, 3, 1] | batchedTrigger() | deref()
# throws error, does not make sense to invert search. Can only invert if .value is specified
[1, 1, 2, 3, 3, 1] | ~batchedTrigger() | deref()
# returns [[[1, 'a'], [1, 'b']], [[2, 'c']], [[3, 'd'], [3, 'e']], [[1, 'f']]]
data | batchedTrigger(0) | deref()
In this mode, elements of the specified column that have the same value consecutively
will be grouped together into a batch. Note the difference between this and :class:`groupBy`.
Specific-value mode::
# returns [[1, 1], [1]]
[1, 1, 2, 3, 3, 1] | batchedTrigger(value=1) | deref()
# returns [[2], [3, 3]], essentially only yield the batches that don't match the specified value
[1, 1, 2, 3, 3, 1] | ~batchedTrigger(value=1) | deref()
# returns [[[1, 'a'], [1, 'b']], [[1, 'f']]]
data | batchedTrigger(0, value=1) | deref()
This mode is pretty much the same as previously, but will only return
the batches whose specified columns match (or not match) a specific value.
Finally, the delta mode::
# returns [[1, 1.1, 1.2], [2], [3], [3.55, 3.8, 3.9, 4.1], [6], [10]]
[1, 1.1, 1.2, 2, 3, 3.55, 3.8, 3.9, 4.1, 6, 10] | batchedTrigger(delta=0.5) | deref()
# returns [[1, 1.1, 1.2], [2], [3], [3.55, 3.8, 3.9], [4.1], [6], [10]]
[1, 1.1, 1.2, 2, 3, 3.55, 3.8, 3.9, 4.1, 6, 10] | batchedTrigger(delta=0.5, adj=False) | deref()
# .col also works here, but it's too long to fit here
In contrast to the previous 2 modes, this does not rely on exact values. Instead,
2 consecutive elements are considered to be in the same batch if their difference
is less than the provided ``delta`` value. Within delta mode, there's the parameter
.adj as well. If this is true, then it will measure the delta between the last (3.9)
and current element (4.1), while if .adj=False, then it will measure the delta between
the first element in the group (3.55) and the current element (4.1, which exceeds
delta, which splits into a new batched).
See also: :class:`~k1lib.cli.filt.trigger`
:param col: column of the trigger value. Can be None, int, or list[int]
:param value: if trigger value equals to this, then yields the batch
:param delta: if trigger value changes more than this, then yields the batch
:param adj: "adjacent", only makes sense if ``delta != None``. See example above""" # batchedTrigger
self.col = col; self.value = value; self.delta = delta; self.adj = adj; self.inverted = False # batchedTrigger
if value is not None and delta is not None: raise Exception(f"Can't specify precise value and fuzzy values (by specifying delta) at the same time. Does not make sense! Leave either .value or .delta empty") # batchedTrigger
[docs] def __invert__(self): # batchedTrigger
if self.value is None: raise Exception("batchedTrigger().value is None! Inverting it would not make sense!") # batchedTrigger
res = batchedTrigger(self.col, self.value); res.inverted = not self.inverted; return res # batchedTrigger
[docs] def __ror__(self, it): # batchedTrigger
col = self.col; value = self.value; delta = self.delta; adj = self.adj; inv = self.inverted # batchedTrigger
it = init.dfGuard(it); arr = []; lastE = object() # sentinel # batchedTrigger
# also, the reason for so many if statements because I don't want to have the ifs inside the for loops, cause that's gonna be slow # batchedTrigger
if value is None and delta is None: # normal mode # batchedTrigger
if col is None: # batchedTrigger
for e in it: # batchedTrigger
if e == lastE: arr.append(e) # batchedTrigger
else: # batchedTrigger
if len(arr): yield arr # batchedTrigger
arr = [e] # batchedTrigger
lastE = e # batchedTrigger
elif isinstance(col, int): # batchedTrigger
for row in it: # batchedTrigger
e = row[col] # batchedTrigger
if e == lastE: arr.append(row) # batchedTrigger
else: # batchedTrigger
if len(arr): yield arr # batchedTrigger
arr = [row] # batchedTrigger
lastE = e # batchedTrigger
else: # batchedTrigger
for row in it: # batchedTrigger
e = tuple([row[x] for x in col]) # batchedTrigger
if e == lastE: arr.append(row) # batchedTrigger
else: # batchedTrigger
if len(arr): yield arr # batchedTrigger
arr = [row] # batchedTrigger
lastE = e # batchedTrigger
if len(arr): yield arr # batchedTrigger
elif value is not None: # normal mode, but only returns specified value # batchedTrigger
if col is None: # batchedTrigger
for e in it: # batchedTrigger
if e == lastE: arr.append(e) # batchedTrigger
else: # batchedTrigger
if len(arr) and (lastE == value) ^ inv: yield arr # batchedTrigger
arr = [e] # batchedTrigger
lastE = e # batchedTrigger
elif isinstance(col, int): # batchedTrigger
for row in it: # batchedTrigger
e = row[col] # batchedTrigger
if e == lastE: arr.append(row) # batchedTrigger
else: # batchedTrigger
if len(arr) and (lastE == value) ^ inv: yield arr # batchedTrigger
arr = [row] # batchedTrigger
lastE = e # batchedTrigger
else: # batchedTrigger
value = tuple(value) # batchedTrigger
for row in it: # batchedTrigger
e = tuple([row[x] for x in col]) # batchedTrigger
if e == lastE: arr.append(row) # batchedTrigger
else: # batchedTrigger
if len(arr) and (lastE == value) ^ inv: yield arr # batchedTrigger
arr = [row] # batchedTrigger
lastE = e # batchedTrigger
if len(arr) and (lastE == value) ^ inv: yield arr # batchedTrigger
elif delta is not None: # delta mode # batchedTrigger
lastE = float("-inf"); arr = [] # batchedTrigger
if adj: # batchedTrigger
if col is None: # batchedTrigger
for e in it: # batchedTrigger
if abs(e-lastE) < delta: arr.append(e) # batchedTrigger
else: # batchedTrigger
if len(arr): yield arr # batchedTrigger
arr = [e] # batchedTrigger
lastE = e # batchedTrigger
elif isinstance(col, int): # batchedTrigger
for row in it: # batchedTrigger
e = row[col] # batchedTrigger
if abs(e-lastE) < delta: arr.append(row) # batchedTrigger
else: # batchedTrigger
if len(arr): yield arr # batchedTrigger
arr = [row] # batchedTrigger
lastE = e # batchedTrigger
else: raise Exception("Multiple columns not supported in delta mode, as it doesn't make any sense") # batchedTrigger
else: # batchedTrigger
if col is None: # batchedTrigger
for e in it: # batchedTrigger
if abs(e-lastE) < delta: arr.append(e) # batchedTrigger
else: # batchedTrigger
if len(arr): yield arr # batchedTrigger
arr = [e]; lastE = e # batchedTrigger
elif isinstance(col, int): # batchedTrigger
for row in it: # batchedTrigger
e = row[col] # batchedTrigger
if abs(e-lastE) < delta: arr.append(row) # batchedTrigger
else: # batchedTrigger
if len(arr): yield arr # batchedTrigger
arr = [row]; lastE = e # batchedTrigger
else: raise Exception("Multiple columns not supported in delta mode, as it doesn't make any sense") # batchedTrigger
if len(arr): yield arr # batchedTrigger
else: raise Exception("Unreachable") # batchedTrigger
nothing = object() # batchedTrigger
[docs]class window(BaseCli): # window
[docs] def __init__(self, n, newList=False, pad=nothing): # window
"""Slides window of size n forward and yields the windows.
# returns [[0, 1, 2], [1, 2, 3], [2, 3, 4]]
range(5) | window(3) | deref()
# returns [[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4, None], [4, None, None]]
range(5) | window(3, pad=None) | deref()
If you are doing strange transformations to the result, like
transposing it, then it might complain that the internal deque
(double-ended queue) mutated during iteration. In that case,
then set ``newList`` to True. It's not True by default because
multiple lists will be created, all of which needs memory
allocation, which will be slower::
# takes 15ms
range(100000) | window(100) | ignore()
# takes 48ms, because of allocating lists
range(100000) | window(100) | ignore()
See also: :class:`~batched`
:param n: size of the window
:param newList: whether to create a new list out of every window or
not. If False (default), less robust but faster. If True, more
robust but slower
:param pad: whether to pad the output stream on the end, so that it
has the same number of elements as the input stream or not""" # window
self.n = n; self.listF = (lambda x: list(x)) if newList else (lambda x: iter(x)) # window
self.pad = pad; self.padBool = pad is not nothing # why do this? Cause in applyMp, "nothing" takes on multiple identities # window
def _all_array_opt(self, it, level): # window
n = it.shape[level]; w = self.n; h = n - w + 1 # window
i = np.arange(h)[:,None] + np.arange(w)[None] # window
return it[(*[slice(None, None, None)]*level,i)] # window
[docs] def __ror__(self, it): # window
if not self.padBool and isinstance(it, settings.arrayTypes): return self._all_array_opt(it, 0) # window
def gen(): # window
n = self.n; pad = self.pad; q = deque([], n); listF = self.listF # window
for e in init.dfGuard(it): # window
q.append(e) # window
if len(q) == n: yield listF(q); q.popleft() # window
if self.padBool: # window
for i in range(n-1): q.append(pad); yield listF(q); q.popleft() # window
return gen() # window
def _jsF(self, meta): # window
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # window
if self.padBool: raise Exception("window._jsF() doesn't support custom pad value yet") # window
return f"{fIdx} = ({dataIdx}) => {dataIdx}.window({cli.kjs.v(self.n)})", fIdx # window
[docs]class groupBy(BaseCli): # groupBy
[docs] def __init__(self, column:int, separate:bool=False, removeCol:bool=None): # groupBy
"""Groups table by some column.
a = [[2.3, 5],
[3.4, 2],
[4.5, 2],
[5.6, 5],
[6.7, 1]]
a | groupBy(1) | deref()
This returns::
[[[6.7, 1]],
[[3.4, 2], [4.5, 2]],
[[2.3, 5], [5.6, 5]]]
Should have O(n log(n)) time complexity. What if ``separate`` is True::
a | groupBy(1, True) | deref()
This returns::
[[1, [[6.7]]],
[2, [[3.4], [4.5]]],
[5, [[2.3], [5.6]]]]
What if ``removeCol`` is False::
a | groupBy(1, True, False) | deref()
This returns::
[[1, [[6.7, 1]]],
[2, [[3.4, 2], [4.5, 2]]],
[5, [[2.3, 5], [5.6, 5]]]]
There's another perspective and way to think about this operation. A lot of
libraries (like pandas) expect the uncompressed, "flat" version (the variable
``a`` in the examples above). But throughout my time using cli, the grouped
version (separate=True) is usually much more useful and amenable to
transformations. It also occupies less memory too, as the columns with duplicated
elements are deleted.
So, you can sort of think :class:`groupBy` is converting pandas dataframes
into a more easily digestible form. But because the prevalence of those libraries,
after doing all the transformations you want, sometimes it's necessary to flatten
it again, which :class:`ungroup` does.
If you want to group text lines by some pattern im them, :class:`~k1lib.cli.grep.grep`
with ``sep=True`` might be better for you.
:param column: which column to group by
:param separate: whether to separate out the column to sort of form a dict or not. See example
:param removeCol: whether to remove the grouped-by column. Defaults to True if
``separate=True``, and False if ``separate=False``""" # groupBy
self.column = column; self.separate = separate # groupBy
if removeCol is None: removeCol = separate # if separate, then remove cols, else don't do it # groupBy
self.removeCol = removeCol # groupBy
[docs] def __ror__(self, it): # groupBy
it = init.dfGuard(it); c = self.column; separate = self.separate; removeCol = self.removeCol # groupBy
if not isinstance(it, settings.arrayTypes): it = [list(e) for e in init.dfGuard(it)] # groupBy
it = it | cli.sort(c, False); sentinel = object(); it = iter(it); a = [next(it, sentinel)] # groupBy
if a[0] is sentinel: return # groupBy
v = a[0][c] # groupBy
try: # groupBy
while True: # groupBy
e = next(it) # groupBy
if e[c] == v: a.append(e) # groupBy
else: # groupBy
if removeCol: a = a | ~cli.cut(c) # groupBy
yield [v, a] if separate else a; a = [e]; v = a[0][c] # groupBy
except StopIteration: # groupBy
if len(a) > 0: # groupBy
if removeCol: a = a | ~cli.cut(c) # groupBy
yield [v, a] if separate else a # groupBy
def _jsF(self, meta): # groupBy
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # groupBy
return f"{fIdx} = ({dataIdx}) => {dataIdx}.groupBy({cli.kjs.v(self.column)}, {cli.kjs.v(self.separate)}, {cli.kjs.v(self.removeCol)})", fIdx # groupBy
[docs]class ungroup(BaseCli): # ungroup
[docs] def __init__(self, single=True, begin=True, insertCol:bool=True): # ungroup
"""Ungroups things that were grouped using a specific mode of
:class:`groupBy`. Particularly useful to transform some complex data
structure into a flat dataframe so that you can plug into pandas. Example::
# returns [[3, 1.2], [3, 3.4], [5, 6], [5, 8], [5, 11]]
[[3, [1.2, 3.4]], [5, [6, 8, 11]]] | ungroup() | deref()
# returns [[3, 1.2], [3, 3.4], [5, 6], [5, 8], [5, 11]]
[[3, [[1.2], [3.4]]], [5, [[6], [8], [11]]]] | ungroup(False) | deref()
# returns [[1.2, 3], [3.4, 3], [6, 5], [8, 5], [11, 5]]
[[3, [1.2, 3.4]], [5, [6, 8, 11]]] | ungroup(begin=False) | deref()
:param single: whether the table in each group has a single column or not
:param begin: whether to insert the column at the beginning or at the end.
Only works if ``insertCol`` is True
:param insertCol: whether to insert the column into the table or not""" # ungroup
self.single = single; self.begin = begin; self.insertCol = insertCol # ungroup
[docs] def __ror__(self, it): # ungroup
preprocess = cli.apply(cli.wrapList().all(), 1) if self.single else cli.iden(); begin = self.begin; it = init.dfGuard(it) # ungroup
if self.insertCol: return it | preprocess | ~cli.apply(lambda x, arr: arr | cli.insert(x, begin).all()) | cli.joinStreams() # ungroup
else: return it | preprocess | cli.cut(1) | cli.joinStreams() # ungroup
def _jsF(self, meta): # ungroup
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # ungroup
return f"{fIdx} = ({dataIdx}) => {dataIdx}.ungroup({cli.kjs.v(self.single)}, {cli.kjs.v(self.begin)}, {cli.kjs.v(self.insertCol)})", fIdx # ungroup
[docs]class insertColumn(BaseCli): # insertColumn
[docs] def __init__(self, column:List[Any], begin=True, fill=""): # insertColumn
"""Inserts a column at beginning or end.
# returns [['a', 1, 2], ['b', 3, 4]]
[[1, 2], [3, 4]] | insertColumn(["a", "b"]) | deref()
# returns [[1, 2, 'a'], [3, 4, 'b']]
[[1, 2], [3, 4]] | insertColumn(["a", "b"], begin=False) | deref()""" # insertColumn
self.column = column; self.begin = begin; self.fill = fill # insertColumn
self._f = transpose.wrap(insert(column, begin=begin), fill=fill) # insertColumn
[docs] def __ror__(self, it): return it | self._f # conveniently, because transpose() and insert() has _all_array_opt(), this automatically has that property as well! Super nice! # insertColumn
def _jsF(self, meta): return self._f._jsF(meta) # insertColumn
[docs]class insertIdColumn(BaseCli): # insertIdColumn
[docs] def __init__(self, table=False, begin=True): # insertIdColumn
"""Inserts an id column at the beginning (or end).
# returns [[0, 'a', 2], [1, 'b', 4]]
[["a", 2], ["b", 4]] | insertIdColumn(True) | deref()
# returns [[0, 'a'], [1, 'b']]
"ab" | insertIdColumn()
This has a shorter alias called :class:`insId`
:param table: if False, then insert column to an Iterator[str], else treat
input as a full fledged table""" # insertIdColumn
self.table = table; self.begin = begin # insertIdColumn
def _all_array_opt(self, it, level): # insertIdColumn
if self.table: # insertIdColumn
if level != len(it.shape)-2: return NotImplemented # insertIdColumn
else: # insertIdColumn
b = (np.arange(it.shape[level]) | cli.repeat(np.prod(it.shape[:level]))).reshape((*it.shape[:level+1], 1)) # insertIdColumn
return (np if isinstance(it, np.ndarray) else torch).concatenate([b, it] if self.begin else [it, b], -1) # insertIdColumn
else: # insertIdColumn
if level != len(it.shape)-1: return NotImplemented # insertIdColumn
else: return it[(*[slice(None,None,None)]*len(it.shape),None)] | insId(True, self.begin).all(level) # insertIdColumn
[docs] def __ror__(self, it): # insertIdColumn
# if isinstance(it, settings.arrayTypes): return self._all_array_opt(it, 0) # insertIdColumn
if isinstance(it, settings.arrayTypes): # insertIdColumn
bm = np if isinstance(it, np.ndarray) else torch # insertIdColumn
if self.table: # insertIdColumn
if len(it.shape) == 2: # insertIdColumn
b = bm.arange(it.shape[0])[:,None] # insertIdColumn
return bm.concatenate([b, it] if self.begin else [it, b], 1) # insertIdColumn
else: # insertIdColumn
if len(it.shape) == 1: # insertIdColumn
a = it[:,None]; b = bm.arange(len(it))[:,None] # insertIdColumn
return bm.concatenate([b, a] if self.begin else [a, b], 1) # insertIdColumn
if self.table: # insertIdColumn
if hasPandas and isinstance(it, pd.DataFrame): # insertIdColumn
try: return it | T() | insert(range(len(it)), self.begin) | T() # insertIdColumn
except: it = init.dfGuard(it) # insertIdColumn
if self.begin: return ([i, *e] for i, e in enumerate(it)) # insertIdColumn
else: return ([*e, i] for i, e in enumerate(it)) # insertIdColumn
else: # insertIdColumn
if hasPandas and isinstance(it, pd.Series): # insertIdColumn
if not it.name: it.name = "A" # insertIdColumn
try: return pd.DataFrame({"_auto_idx": range(len(it)), it.name: it} if self.begin else {it.name: it, "_auto_idx": range(len(it))}) # insertIdColumn
except: pass # insertIdColumn
if self.begin: return ([i, e] for i, e in enumerate(init.dfGuard(it))) # insertIdColumn
else: return ([e, i] for i, e in enumerate(init.dfGuard(it))) # insertIdColumn
def _jsF(self, meta): # insertIdColumn
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # insertIdColumn
return f"{fIdx} = ({dataIdx}) => {dataIdx}.insertIdColumn({cli.kjs.v(self.table)}, {cli.kjs.v(self.begin)})", fIdx # insertIdColumn
insId = insertIdColumn # insertIdColumn
[docs]def unsqueeze(dim:int=0): # unsqueeze
"""Unsqueeze input iterator.
t = [[1, 2], [3, 4], [5, 6]]
# returns (3, 2)
t | shape()
# returns (1, 3, 2)
t | unsqueeze(0) | shape()
# returns (3, 1, 2)
t | unsqueeze(1) | shape()
# returns (3, 2, 1)
t | unsqueeze(2) | shape()
Behind the scenes, it's really just ``wrapList().all(dim)``, but the "unsqueeze" name
is a lot more familiar. Also note that the inverse operation "squeeze" is sort of
``item().all(dim)``, if you're sure that this is desirable::
t = [[1, 2], [3, 4], [5, 6]]
# returns (3, 2)
t | unsqueeze(1) | item().all(1) | shape()""" # unsqueeze
return cli.wrapList().all(dim) # unsqueeze
def oUnsqueeze(cs, ts, metadata): # reminder: if change this then also change the example in llvm.rst # oUnsqueeze
a = cs[0]; t = ts[0]; i = 0; # oUnsqueeze
if not isinstance(t, tArrayTypes): return None # oUnsqueeze
while isinstance(a, cli.apply) and a.normal: i += 1; a = a.f # oUnsqueeze
if not isinstance(a, cli.wrapList): return None # oUnsqueeze
t = t.__class__(t.child, t.rank+1 if t.rank is not None else None) # oUnsqueeze
if isinstance(t, tNpArray): return [cli.aS(lambda x: np.expand_dims(x, i)).hint(t)] # oUnsqueeze
else: return [cli.aS(lambda x: x.unsqueeze(i)).hint(t)] # oUnsqueeze
tOpt.addPass(oUnsqueeze, [cli.apply], 4) # oUnsqueeze
[docs]class count(BaseCli): # count
[docs] def __init__(self, max=False): # count
"""Finds unique elements and returns a table with [frequency, value, percent]
columns. Example::
# returns [[1, 'a', '33%'], [2, 'b', '67%']]
['a', 'b', 'b'] | count() | deref()
# returns [[1, 'a', '50%'], [2, 'b', '100%']]
['a', 'b', 'b'] | count(max=True) | deref()
:param max: if True, percentages are of the max value, else they're of the sum
""" # count
super().__init__(); self._max = max # count
def _typehint(self, inp): # count
i = tAny() # count
if isinstance(inp, tListIterSet): i = inp.child # count
return tIter(tCollection(int, i, str)) # count
[docs] def __ror__(self, it:Iterator[str]): # count
it = cli.apply(lambda row: (tuple(row) if isinstance(row, list) else row))(init.dfGuard(it)) # count
c = Counter(it); s = max(c.values()) if self._max else sum(c.values()) # count
return [[v, k, f"{round(100*v/s)}%"] for k, v in c.items()] # has to scan through the entire thing anyway, which is long enough already, so just turn it into a list # count
[docs] @staticmethod # count
def join(): # count
"""Joins multiple counts together.
# returns [[2, 'a', '33%'], [4, 'b', '67%']]
['a', 'b', 'b'] | repeat(2) | applyMp(count() | deref()) | count.join() | deref()
This is useful when you want to get the count of a really long list/iterator using multiple cores""" # count
def inner(counts): # count
values = defaultdict(lambda: 0) # count
for _count in counts: # count
if _count is None: continue # count
for v, k, *_ in _count: # count
values[k] += v # count
s = values.values() | cli.toSum() # count
return [[v, k, f"{round(100*v/s)}%"] for k, v in values.items()] # count
return cli.applyS(inner) # count
def _jsF(self, meta): # count
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # count
return f"{fIdx} = ({dataIdx}) => {dataIdx}.count()", fIdx # count
[docs]class hist(BaseCli): # hist
[docs] def __init__(self, bins:int=30, dropZero:bool=False): # hist
"""Bins a long 1d array. Effectively creating a historgram, without
actually plotting it. Example::
np.random.randn(1000) | hist(5)
That returns something like::
(array([-2.31449761, -1.17406889, -0.03364017, 1.10678854, 2.24721726]),
array([ 41, 207, 432, 265, 55]),
This format goes with :meth:`~matplotlib.pyplot.bar` directly like this::
np.random.randn(1000) | hist(10) | ~aS(plt.bar)
If you have tons of data that's handled in multiple processes, but you want to
get an overall histogram, you can do something like this::
# bad solution, runs slow, accurate
fileNames | applyMp(cat() | toFloat() | aS(list)) | joinStreams() | hist() | ~aS(plt.bar)
# good solution, runs fast, slightly inaccurate
fileNames | applyMp(cat() | toFloat() | hist(300)) | hist.join() | ~aS(plt.bar)
Let's say in each process, you have 10M records, and that you have 1000 processes in total.
In the first solution, you transfer all records (10B records total) to a single process,
then calculate the histogram of them. The transfer overhead is going to be absolutely
enourmous, as well as the computation. This really defeats the purpose of doing
In the second solution, you "convert" 10M records into 600 numbers for each process,
which scales up to 600k numbers for all processes. Although big, but certainly
manageable with current hardware. So the data transfer cost is not a lot at all. The
histogram merging part also executes relatively fast, as it only creates an internal
array of 3M length. See over :meth:`hist.join` for param details
:param bins: how many bins should the histogram has?
:param dropZero: if True, will eliminate buckets that has no element in them""" # hist
self.bins = bins; self.dropZero = dropZero # hist
[docs] def __ror__(self, it): # hist
if hasPandas and isinstance(it, pd.DataFrame): it = init.dfGuard(it) # hist
if not (isinstance(it, settings.arrayTypes) or (hasPandas and isinstance(it, pd.core.arraylike.OpsMixin))): it = list(it) # hist
y, x = np.histogram(it, bins=self.bins) # hist
delta = x[1] - x[0]; x = (x[1:] + x[:-1])/2 # hist
if self.dropZero: idx = y > 0; return x[idx], y[idx], delta # hist
return x, y, delta # hist
[docs] @staticmethod # hist
def join(scale:float=1e4, bins:int=None, log:bool=True, xlog:bool=False): # hist
"""Joins multiple histograms together.
a = np.random.randn(1000); b = np.random.randn(1000)+3
[a, b] | joinStreams() | hist() | head(2) | ~aS(plt.plot) # ---------------------------------- Ground truth
[a, b] | hist(300).all() | hist.join(scale=1e4) | head(2) | ~aS(plt.plot) # ------------------ Log joining
[a, b] | hist(300).all() | hist.join(scale=1e4, log=False) | head(2) | ~aS(plt.plot, ".") # -- Linear joining
plt.legend(["Ground truth", "Log", "Linear"]); plt.grid(True); plt.ylabel("Frequency"); plt.xlabel("Value");
This results in this:
.. image:: ../images/hist1.png
As you can see, this process is only approximate, but is accurate enough in everyday
use. If you are a normal user, then this is probably enough. However, if you're a
mathematician and really care about the accuracy of this, read on.
.. admonition:: Performance vs accuracy
As mentioned in :class:`hist`, joining histograms from across processes can really speed
things up big time. But the joining process is complicated, with multiple parameters
and different tradeoffs for each config. In this example, scale = 1e4, bins = 30,
OG bins = 300, log = True. "OG bins" is the number of bins coming into :meth:`hist.join`
To get the best accuracy possible, you should set scale and OG bins high. If better
performance is desired, you should first lower scale, then lower OG bins, then finally lower
.. admonition:: Log scale
Take a look at this piece of code::
a, b = np.random.randn(1000)*1, np.random.randn(3000000)*0.3+3
[a, b] | joinStreams() | hist() | head(2) | ~aS(plt.plot)
[a, b] | hist(300).all() | hist.join(scale=1e4) | head(2) | ~aS(plt.plot)
[a, b] | hist(300).all() | hist.join(scale=1e4, log=False) | head(2) | ~aS(plt.plot, ".")
plt.yscale("log"); plt.legend(["Ground truth", "Log", "Linear"]); plt.grid(True); plt.ylabel("Frequency"); plt.xlabel("Value");
This results in:
.. image:: ../images/hist2.png
This shows how log mode is generally better than linear mode when the frequencies span
across multiple orders of magnitude. So why not delete linear mode directly? Well, I have
not formally proved that log scale case fully covers the linear case, although in practice
it seems so. So just to be cautious, let's leave it in
.. admonition:: Scale
The setup is just like in the "Log scale" section, but with scale = 1e3 instead of the default 1e4:
.. image:: ../images/hist3.png
Remember that the higher the scale, the more accurate, but also the longer it runs. If
the difference in high and low frequencies are bigger than scale, then the low
frequency values are dropped.
:param scale: how big a range of frequencies do we want to capture?
:param bins: output bins. If not specified automatically defaults to 1/10 the
original number of bins
:param log: whether to transform everything to log scale internally on the y axis
:param xlog: whether to transform everything to log scale internally on the x axis""" # hist
def inner(it): # hist
it = it | (cli.apply(cli.apply(math.log), 0) if xlog else cli.iden()) | cli.deref() # hist
_bins = bins if bins is not None else it | cli.cut(0) | cli.shape(0).all() | cli.toMean() | cli.op()/10 | cli.aS(int) # hist
maxY = max(it | cli.cut(1) | cli.toMax().all() | cli.toMax() | cli.op()/scale, 1e-9) # hist
if log: it = it | cli.apply(lambda x: np.log(x+1e-9)-math.log(maxY) | cli.aS(np.exp) | cli.aS(np.round) | cli.op().astype(int), 1) | cli.deref() # hist
else: it = it | cli.apply(lambda y: (y/maxY).astype(int), 1) | cli.deref() # hist
x, y, delta = it | cli.cut(0, 1) | cli.transpose().all() | cli.joinStreams() | ~cli.apply(lambda a,b: [a]*b) | cli.joinStreams() | cli.aS(list) | cli.aS(np.array) | hist(_bins) # hist
return x | ((cli.apply(math.exp) | cli.deref()) if xlog else cli.iden()), y*maxY, delta # hist
return cli.aS(inner) # hist
def _permuteGen(row, pers): # _permuteGen
row = list(row); return (row[i] for i in pers) # _permuteGen
[docs]class permute(BaseCli): # permute
[docs] def __init__(self, *permutations:List[int]): # permute
"""Permutes the columns. Acts kinda like :meth:`torch.Tensor.permute`.
# returns [['b', 'a'], ['d', 'c']]
["ab", "cd"] | permute(1, 0) | deref()""" # permute
super().__init__(); self.permutations = permutations # permute
[docs] def __ror__(self, it:Iterator[str]): # permute
p = self.permutations # permute
for row in it: yield _permuteGen(row, p) # permute
def _jsF(self, meta): # permute
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # permute
return f"{fIdx} = ({dataIdx}) => {dataIdx}.permute({cli.kjs.vs(self.permutations) | cli.join(', ')})", fIdx # permute
[docs]class AA_(BaseCli): # AA_
[docs] def __init__(self, *idxs:List[int], wraps=False): # AA_
"""Returns 2 streams, one that has the selected element, and the other
the rest. Example::
# returns [5, [1, 6, 3, 7]]
[1, 5, 6, 3, 7] | AA_(1)
# returns [[5, [1, 6, 3, 7]]]
[1, 5, 6, 3, 7] | AA_(1, wraps=True)
You can also put multiple indexes through::
# returns [[1, [5, 6]], [6, [1, 5]]]
[1, 5, 6] | AA_(0, 2)
If you don't specify anything, then all indexes will be sliced::
# returns [[1, [5, 6]], [5, [1, 6]], [6, [1, 5]]]
[1, 5, 6] | AA_()
As for why the strange name, think of this operation as "AÄ€". In statistics,
say you have a set "A", then "not A" is commonly written as A with an overline
"Ä€". So "AA\_" represents "AÄ€", and that it first returns the selection A.
:param wraps: if True, then the first example will return [[5, [1, 6, 3, 7]]]
instead, so that A has the same signature as Ā""" # AA_
super().__init__(); self.idxs = idxs; self.wraps = wraps # AA_
[docs] def __ror__(self, it:List[Any]) -> List[List[List[Any]]]: # AA_
super().__ror__(it); idxs = self.idxs; it = list(init.dfGuard(it)) # AA_
if len(idxs) == 0: idxs = range(len(it)) # AA_
def gen(idx): # AA_
return [it[idx], [v for i, v in enumerate(it) if i != idx]] # AA_
if not self.wraps and len(idxs) == 1: return gen(idxs[0]) # AA_
return [gen(idx) for idx in idxs] # AA_
[docs]class peek(BaseCli): # peek
[docs] def __init__(self): # peek
"""Returns (firstRow, iterator). This sort of peaks at the first row,
to potentially gain some insights about the internal formats. The returned
iterator is not tampered. Example::
e, it = iter([[1, 2, 3], [1, 2]]) | peek()
print(e) # prints "[1, 2, 3]"
s = 0
for e in it: s += len(e)
print(s) # prints "5", or length of 2 lists
You kinda have to be careful about handling the ``firstRow``, because you might
inadvertently alter the iterator::
e, it = iter([iter(range(3)), range(4), range(2)]) | peek()
e = list(e) # e is [0, 1, 2]
list(next(it)) # supposed to be the same as `e`, but is [] instead
The example happens because you have already consumed all elements of the first
row, and thus there aren't any left when you try to call ``next(it)``.""" # peek
super().__init__() # peek
[docs] def __ror__(self, it:Iterator[Any]) -> Tuple[Any, Iterator[Any]]: # peek
if isinstance(it, settings.arrayTypes): # peek
try: return it[0], it # peek
except: return None, [] # peek
if hasPandas: # peek
if isinstance(it, pd.DataFrame): # peek
try: return it[:1].to_numpy()[0], it # peek
except: return None, [] # peek
if isinstance(it, pd.core.arraylike.OpsMixin): # peek
try: return it[0], it # peek
except: return None, [] # peek
it = iter(it); sentinel = object(); row = next(it, sentinel) # peek
if row is sentinel: return None, [] # peek
def gen(): yield row; yield from it # peek
return row, gen() # peek
[docs]class peekF(BaseCli): # peekF
[docs] def __init__(self, f:Union[BaseCli, Callable[[Any], Any]]): # peekF
r"""Similar to :class:`peek`, but will execute ``f(row)`` and
return the input Iterator, which is not tampered. Example::
it = lambda: iter([[1, 2, 3], [1, 2]])
# prints "[1, 2, 3]" and returns [[1, 2, 3], [1, 2]]
it() | peekF(lambda x: print(x)) | deref()
# prints "1\n2\n3"
it() | peekF(headOut()) | deref()""" # peekF
super().__init__(fs=[f]); self.f = f # peekF
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # peekF
f = self.f # peekF
if isinstance(it, settings.arrayTypes): # peekF
try: it[0]; f(it[0]); return it # peekF
except: return [] # peekF
if hasPandas: # peekF
if isinstance(it, pd.DataFrame): # peekF
try: a = it[:1].to_numpy()[0]; f(a); return it # peekF
except: return [] # peekF
if isinstance(it, pd.core.arraylike.OpsMixin): # peekF
try: it[0]; f(it[0]); return it # peekF
except: return None, [] # peekF
it = iter(it); sentinel = object(); row = next(it, sentinel) # peekF
if row is sentinel: return [] # peekF
def gen(): yield row; yield from it # peekF
f(row); return gen() # peekF
settings.add("repeat", k1lib.Settings().add("infBs", 100, "if dealing with infinite lists, how many elements at a time should be processed?"), "settings related to repeat() and repeatFrom()") # peekF
[docs]class repeat(BaseCli): # repeat
[docs] def __init__(self, limit:int=None): # repeat
"""Yields a specified amount of the passed in object. If you intend
to pass in an iterator, then make a list out of it first, as second copy of
iterator probably won't work as you will have used it the first time. Example::
# returns [[1, 2, 3], [1, 2, 3], [1, 2, 3]]
[1, 2, 3] | repeat(3) | deref()
If ``limit`` is not specified, then this will repeat indefinitely, which may or may
not cause a problem downstream. It could be that some downstream operators want to
get the full view of all elements in the input iterator, but because it's an infinite
iterator, it will never complete executing and get the full view that it wants::
[1, 2] | repeat() | deref() | headOut() # will hang indefinitely, fill RAM up and crash after a while
[1, 2] | (repeat() | deref()) | headOut() # will not hang, and actually print out the first 10 lines, each line is "[1, 2]"
The solution is to just make a group that starts with ``repeat()``. Then ``repeat()``
can capture all downstream operations, and actually batch up the infinite iterator,
pass each batch into the downstream operations, and merge the results together. The
batch size can be controlled by ``settings.cli.repeat.infBs = 200`` if you need it
for some reason.
See also: :meth:`repeatF`
:param repeat: if None, then repeats indefinitely""" # repeat
super().__init__(capture=True); self.limit = limit # repeat
def _typehint(self, inp): return tIter(inp) # repeat
def _all_array_opt(self, it, level): # repeat
if self.limit is None or self.limit == float("inf"): return NotImplemented # repeat
sh = it | cli.shape(); s = slice(None, None, None) # repeat
it = it[(*[s]*level, None)]; newSh = [*sh[:level], self.limit, *sh[level:]] # repeat
if isinstance(it, np.ndarray): return np.broadcast_to(it, newSh) | self.capturedSerial.all(level) # repeat
elif hasTorch and isinstance(it, torch.Tensor): return it.expand(newSh) | self.capturedSerial.all(level) # repeat
return NotImplemented # repeat
[docs] def __ror__(self, o:Any) -> Iterator[Any]: # repeat
limit = self.limit if self.limit != None else k1lib.settings.cli.inf; ser = self.capturedSerial # repeat
def gen(): # repeat
for i in itertools.count(): # repeat
if i >= limit: break # repeat
yield o # repeat
if limit < float("inf"): return self._all_array_opt(o, 0) if isinstance(o, settings.arrayTypes) else (gen() | ser) # repeat
return (gen() | ser) if len(self.capturedClis) == 0 else gen() | cli.batched(settings.repeat.infBs) | cli.apply(ser) | cli.joinStreams() # repeat
def _jsF(self, meta): # repeat
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # repeat
if self.limit is None: raise Exception("repeat._jsF() can't repeat the input array infinitely many times, as there's no iterator concept in JS") # repeat
return f"{fIdx} = ({dataIdx}) => {dataIdx}.repeat({cli.kjs.v(self.limit)})", fIdx # repeat
class _repeatF(BaseCli): # _repeatF
def __init__(self, limit, kwargs): super().__init__(capture=True); self.limit = limit; self.kwargs = kwargs # _repeatF
def __ror__(self, f): # _repeatF
f = fastF(f); limit = self.limit; kwargs = self.kwargs; ser = self.capturedSerial # _repeatF
limit = limit if limit != None else k1lib.settings.cli.inf # _repeatF
def gen(): # _repeatF
if len(kwargs) == 0: # _repeatF
for i in itertools.count(): # _repeatF
if i >= limit: break # _repeatF
yield f() # _repeatF
else: # _repeatF
for i in itertools.count(): # _repeatF
if i >= limit: break # _repeatF
yield f(**kwargs) # _repeatF
if limit < float("inf"): return gen() | ser # _repeatF
return (gen() | ser) if len(self.capturedClis) == 0 else gen() | cli.batched(settings.repeat.infBs) | cli.apply(ser) | cli.joinStreams() # _repeatF
[docs]def repeatF(f=None, limit:int=None, **kwargs): # repeatF
"""Yields a specified amount generated by a specified function.
repeatF(lambda: 4, 3) | deref() # returns [4, 4, 4]
(lambda: 4) | repeatF(limit=3) | deref() # returns [4, 4, 4]
repeatF(lambda: 4) | head() | shape(0) # returns 10
f = lambda a: a+2
repeatF(f, 3, a=6) | deref() # returns [8, 8, 8]
f | repeatF(limit=3, a=6) | deref() # returns [8, 8, 8]
See :class:`repeat` for a discussion on how to deal with infinite iterators.
If you want to do something like that, then you have to pipe in f instead of
specifying f in the params directly.
:param f: function to repeat. If None, then you can pipe the function in
:param limit: if None, then repeats indefinitely
:param kwargs: extra keyword arguments that you can pass into the function
See also: :class:`repeatFrom`""" # repeatF
return _repeatF(limit, kwargs) if f is None else f | _repeatF(limit, kwargs) # repeatF
[docs]class repeatFrom(BaseCli): # repeatFrom
[docs] def __init__(self, limit:int=None): # repeatFrom
"""Yields from a list. If runs out of elements, then do it again for
``limit`` times. Example::
# returns [1, 2, 3, 1, 2]
[1, 2, 3] | repeatFrom() | head(5) | deref()
# returns [1, 2, 3, 1, 2, 3]
[1, 2, 3] | repeatFrom(2) | deref()
See :class:`repeat` for a discussion on how to deal with infinite iterators
.. note::
For advanced users who wants to modify the resulting stream mid-way, read this section
Because this reuses elements inside the input iterator, it's necessary
that the input feels like a list and not an iterator. So in order to make
this work::
# returns [1, 2, 3, 1, 2, 3]
iter([1, 2, 3]) | repeatFrom(2) | deref()
It's necessary to turn the input iterator into a list. However, sometimes you
may want to update the input iterator values, so as to make things extra
dynamic, like this::
l = [1, 2, 3]
def g(): yield from l; yield from l
def h():
for i, e in enumerate(g()):
if i == 3: l.append(5) # modifies the list mid-way
yield e
h() | deref() # returns [1, 2, 3, 1, 2, 3, 5]
But if you do this, it wouldn't work::
l = [1, 2, 3]
def h():
for i, e in enumerate(iter(l) | repeatFrom(2)):
if i == 3: l.append(5)
yield e
h() | deref() # returns [1, 2, 3, 1, 2, 3]
This is because internally, :class:`repeatFrom` turns the iterator into a
list, and continues yielding from that list, and thus won't use the updated
values. To do it, you have to make the input feels like a list (can get length)::
l = [1, 2, 3]
def h():
for i, e in enumerate(l | repeatFrom(2)): # ---------------- changed section
if i == 3: l.append(5)
yield e
h() | deref() # returns [1, 2, 3, 1, 2, 3, 5]
:param limit: if None, then repeats indefinitely""" # repeatFrom
super().__init__(capture=True); self.limit = limit # repeatFrom
def _typehint(self, inp): # repeatFrom
i = tAny() # repeatFrom
if isinstance(inp, tListIterSet): i = inp.child # repeatFrom
if isinstance(inp, tArrayTypes): i = inp # repeatFrom
return tIter(i) # repeatFrom
def _all_array_opt(self, it, level:int): # repeatFrom
if self.limit is None or self.limit == float("inf"): return NotImplemented # repeatFrom
return it | (repeat(self.limit) | joinStreams()).all(level) | self.capturedSerial.all(level) # repeatFrom
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # repeatFrom
limit = self.limit or k1lib.settings.cli.inf; ser = self.capturedSerial # repeatFrom
def gen(it): # repeatFrom
try: len(it) # repeatFrom
except: it = list(it) # repeatFrom
for i in itertools.count(): # repeatFrom
if i >= limit: break # repeatFrom
yield from it # repeatFrom
if limit < float("inf"): # repeatFrom
if isinstance(it, settings.arrayTypes): return self._all_array_opt(it, 0) # repeatFrom
if hasPandas: # repeatFrom
if isinstance(it, pd.DataFrame): return pd.DataFrame({s.name:pd.concat([s]*limit, ignore_index=True) for s in it | T()}) | ser # repeatFrom
if isinstance(it, pd.Series): return pd.concat([it]*limit, ignore_index=True) # repeatFrom
return gen(it) | ser # repeatFrom
it = init.dfGuard(it); return (gen(it) | ser) if len(self.capturedClis) == 0 else gen(it) | cli.batched(settings.repeat.infBs) | cli.apply(ser) | cli.joinStreams() # repeatFrom
def _jsF(self, meta): # repeatFrom
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # repeatFrom
if self.limit is None: raise Exception(f"repeatFrom._jsF() can't repeat the input array infinitely many times, as there's no iterator concept in JS") # repeatFrom
return f"{fIdx} = ({dataIdx}) => {dataIdx}.repeatFrom({cli.kjs.v(self.limit)})", fIdx # repeatFrom
def oneHotRow(i, n): ans = [0]*n; ans[i] = 1; return ans # oneHotRow
[docs]class oneHot(BaseCli): # oneHot
_groups = {} # oneHot
[docs] def __init__(self, col, n:int=0, group:str=None, sep:bool=False): # oneHot
"""One-hot encode some column in a table.
a = [
[1, 2, "A"],
[3, 4, "B"],
[5, 6, "C"]]
b = [
[7, 8, "A"],
[9, 10, "B"],
[11, 12, "B"]]
[*a, *b] | oneHot(2) | deref()
[*a, *b] | oneHot(2, 3, "abcd") | deref()
Last 2 statements both return this::
[[1, 2, 1, 0, 0],
[3, 4, 0, 1, 0],
[5, 6, 0, 0, 1],
[7, 8, 1, 0, 0],
[9, 10, 0, 1, 0],
[11, 12, 0, 1, 0]]
You can also separate the encoded column out like this::
[*a, *b] | oneHot(2, sep=True) | deref()
Which returns this::
[[1, 2, [1, 0, 0]],
[3, 4, [0, 1, 0]],
[5, 6, [0, 0, 1]],
[7, 8, [1, 0, 0]],
[9, 10, [0, 1, 0]],
[11, 12, [0, 1, 0]]]
The natural way to do this is to use with without ``n`` and ``group`` parameters.
But sometimes, your one hot encoding is spreaded across multiple datasets in
multiple dataloaders, and so the order and length of the encoding might not be
the same, which will mess up your training process.
That's why, you can specify ``group``, which will share encoding information
across all :class:`oneHot` clis that have the same group name. If you choose to
do this then you have to also specify what's the size of the encoding, because
the cli can't really infer the size when it potentially has not seen all the data
:param col: which column one hot encode and expand into
:param n: (optional) total number of different elements
:param group: (optional) group name
:param sep: (optional) whether to separate the variable out into its own list""" # oneHot
self.col = col; self.n = n; self.group = group; self.sep = sep # oneHot
if (n != 0 and group is not None) and (n == 0 or group is None): # oneHot
raise Exception("You have to specify both `n` and `group` at the same time if you want to use them") # oneHot
if group is not None: # oneHot
if group not in oneHot._groups: # oneHot
oneHot._groups[group] = dict() # oneHot
self.d = oneHot._groups[group] # oneHot
else: self.d = dict() # oneHot
def _typehint(self, inp): # oneHot
# TODO # oneHot
if isinstance(inp, tListIterSet): # oneHot
if isinstance(inp.child, tListIterSet): # oneHot
pass # oneHot
elif isinstance(inp.child, tListIterSet): # oneHot
pass # oneHot
pass # oneHot
return tIter(tAny()) # oneHot
[docs] def __ror__(self, it): # oneHot
c = self.col; d = self.d; n = self.n; sep = self.sep; it = init.dfGuard(it) # oneHot
if n == 0: # oneHot
it = it | cli.deref(2); n = it | cli.cut(c) | cli.aS(set) | cli.shape(0) # oneHot
for row in it: # oneHot
e = row[c] # oneHot
try: e[0]; len(e) # oneHot
except: e = list(e) # oneHot
if e not in d: d[e] = oneHotRow(len(d), n) # oneHot
if sep: yield [*row[:c], d[e], *row[c+1:]] # oneHot
else: yield [*row[:c], *d[e], *row[c+1:]] # oneHot
return # oneHot
_d = it | cli.cut(c) | cli.aS(set) | cli.sort(None, False) | cli.deref(); n = len(_d) # oneHot
_d = _d | insertIdColumn(begin=False) | cli.apply(cli.aS(oneHotRow, n), 1) | transpose() | cli.toDict() # oneHot
for row in it: yield [*row[:c], *_d[row[c]], *row[c+1:]] # oneHot
[docs]class latch(BaseCli): # latch
[docs] def __init__(self, timeline, before=True, startTok=None, endTok=None): # latch
"""Latches 1 timeline (B) onto another timeline (A).
a = [[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"], [6, "f"], [7, "g"], [8, "h"], [9, "i"]]
b = [[2.1, "2.1"], [3.1, "3.1"]]
c = [[4.5, "4.5"], [7.3, "7.3"]]
a[:5] | latch(b) | deref() # returns [[1, 'a', None], [2, 'b', None], [3, 'c', '2.1'], [4, 'd', '3.1'], [5, 'e', '3.1']]
a[:5] | latch(b, False) | deref() # returns [[1, 'a', '2.1'], [2, 'b', '2.1'], [3, 'c', '3.1'], [4, 'd', None], [5, 'e', None]]
b | latch(a) | deref() # returns [[2.1, '2.1', 'b'], [3.1, '3.1', 'c']]
b | latch(a, False) | deref() # returns [[2.1, '2.1', 'c'], [3.1, '3.1', 'd']]
# latching timelines that don't overlap
b | latch(c) | deref() # returns [[2.1, '2.1', None], [3.1, '3.1', None]]
b | latch(c, False) | deref() # returns [[2.1, '2.1', '4.5'], [3.1, '3.1', '4.5']]
c | latch(b) | deref() # returns [[4.5, '4.5', '3.1'], [7.3, '7.3', '3.1']]
c | latch(b, False) | deref() # returns [[4.5, '4.5', None], [7.3, '7.3', None]]
# multiple latches also work fine
a | latch(b) | latch(c) | deref()
Final command returns this::
[[1, 'a', None, None],
[2, 'b', None, None],
[3, 'c', '2.1', None],
[4, 'd', '3.1', None],
[5, 'e', '3.1', '4.5'],
[6, 'f', '3.1', '4.5'],
[7, 'g', '3.1', '4.5'],
[8, 'h', '3.1', '7.3'],
[9, 'i', '3.1', '7.3']]
So essentially, there're 2 timelines, "a" and "b". "a" is usually more densely
packed than "b". The goal is to merge these 2 timelines together, trying to insert
b's data points at various points inside a. Try to figure out the pattern and
behavior from the example.
This cli makes several assumptions:
- All timelines inside the argument (if ``a | latch(b)``, then I'm talking about b) have only 2 columns: (time, data)
- Timelines piped into (if ``a | latch(b)``, then I'm talking about a) can have multiple columns. First column has to be time
- All timelines are sorted by time already, to avoid resorting
See also: :class:`~k1lib.cli.utils.syncStepper`
:param timeline: sparser timeline
:param before: whether to use the timeline B's point before or after timeline A's point
:param startTok: token to use for A's samples where B has not gone into the picture yet
:param endTok: token to use for A's samples where B has yielded all elements""" # latch
self.timeline = timeline; self.before = before; self.startTok = startTok; self.endTok = endTok # latch
[docs] def __ror__(self, it): # latch
timeline = [[[-float("inf"), self.startTok]], self.timeline, [[float("inf"), self.endTok]]] | cli.joinSt() # latch
before = self.before; lastT, lastT_desc = next(timeline); t, t_desc = next(timeline) # latch
for row in init.dfGuard(it): # latch
while row[0] > t: # latch
lastT = t; lastT_desc = t_desc # latch
t, t_desc = next(timeline) # latch
yield [*row, lastT_desc if before else t_desc] # latch