Source code for k1lib.cli.structural

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for functions that sort of changes the table
structure in a dramatic way. They're the core transformations
"""
from typing import List, Union, Iterator, Callable, Any, Tuple, Dict
from collections import defaultdict, Counter, deque
from k1lib.cli.init import patchDefaultDelim, BaseCli, oneToMany, fastF, yieldT
import k1lib.cli as cli; import k1lib.cli.init as init; from k1lib.cli.typehint import *
import itertools, numpy as np, k1lib, math, json; plt = k1lib.dep.plt
try: import torch; hasTorch = True
except: torch = k1lib.Object().withAutoDeclare(lambda: type("RandomClass", (object, ), {})); hasTorch = True
try: import pandas as pd; pd.core; hasPandas = True
except: hasPandas = False
__all__ = ["transpose", "T", "reshape", "insert", "splitW", "splitC",
           "joinStreams", "joinSt", "joinStreamsRandom", "activeSamples",
           "table", "batched", "batchedTrigger", "window", "groupBy", "ungroup",
           "insertColumn", "insertIdColumn", "insId",
           "unsqueeze",
           "count", "hist", "permute", "AA_", "peek", "peekF",
           "repeat", "repeatF", "repeatFrom", "oneHot", "latch"]
settings = k1lib.settings.cli
[docs]class transpose(BaseCli): # transpose
[docs] def __init__(self, dim1:int=0, dim2:int=1, fill=None): # transpose """Join multiple columns and loop through all rows. Aka transpose. Example:: # returns [[1, 4], [2, 5], [3, 6]] [[1, 2, 3], [4, 5, 6]] | transpose() | deref() # returns [[1, 4], [2, 5], [3, 6], [0, 7]] [[1, 2, 3], [4, 5, 6, 7]] | transpose(fill=0) | deref() Multidimensional transpose works just like :meth:`torch.transpose` too:: # returns (2, 7, 5, 3), but detected Tensor, so it will use builtin :meth:`torch.transpose` torch.randn(2, 3, 5, 7) | transpose(3, 1) | shape() # also returns (2, 7, 5, 3), but actually does every required computation. Can be slow if shape is huge torch.randn(2, 3, 5, 7) | deref(igT=False) | transpose(3, 1) | shape() Can also work with numpy arrays:: # returns (5, 3, 2) np.random.randn(2, 3, 5) | transpose(0, 2) | op().shape Be careful with infinite streams, as transposing stream of shape (inf, 5) will hang this operation! Either don't do it, or temporarily limit all infinite streams like this:: with settings.cli.context(inf=21): # returns (3, 21) [2, 1, 3] | repeat() | transpose() | shape() Also be careful with empty streams, as you might not get any results at all:: # returns [], as the last stream has no elements [[1, 2], [3, 4], []] | transpose() | deref() # returns [[1, 3, 0], [2, 4, 0]] [[1, 2], [3, 4], []] | transpose(fill=0) | deref() This also has an alias of ``T()``, so you can make it shorter, like ``np.random.randn(2, 3, 5) | T()`` :param fill: if not None, then will try to zip longest with this fill value""" # transpose super().__init__(); self.fill = fill # transpose self.d1 = min(dim1, dim2); self.d2 = max(dim1, dim2) # transpose self.normal = self.d1 == 0 and self.d2 == 1 # transpose
def _all_array_opt(self, it, level:int): return it | transpose(self.d1 + level, self.d2 + level, self.fill) # transpose def _typehint(self, inp): # transpose if isinstance(inp, tArrayTypes): # transpose if inp.rank is None: return inp # transpose if inp.rank > max(self.d1, self.d2): return inp # transpose else: return tAny() # this case doesn't quite exist # transpose if self.d1 == 0 and self.d2 == 1: # transpose if isinstance(inp, tListIterSet): # transpose if isinstance(inp.child, tListIterSet): # transpose return tIter(tList(inp.child.child)) # transpose return tAny() # transpose
[docs] def __ror__(self, it): # transpose d1 = self.d1; d2 = self.d2; fill = self.fill # transpose if isinstance(it, torch.Tensor): return it.transpose(d1, d2) # transpose if isinstance(it, np.ndarray): # transpose dims = list(range(len(it.shape))) # transpose temp = dims[d1]; dims[d1] = dims[d2]; dims[d2] = temp # transpose return it.transpose(dims) # transpose if hasPandas and isinstance(it, pd.core.frame.DataFrame): # transpose if d1 != 0 or d2 != 1: raise Exception("Can't do multidimensonal transpose on a data frame, as that makes no sense") # transpose return [it[k] for k in it] # transpose if d1 != 0 or d2 != 1: return it | cli.serial(*([transpose(fill=fill).all(i) for i in range(d1, d2)] + [transpose(fill=fill).all(i-1) for i in range(d2-1, d1, -1)])) # transpose if hasPandas: # transpose try: # transpose it[:]; len(it) # transpose if sum(isinstance(x, pd.core.frame.Series) for x in it[:5]) >= 2: # list of series, so can turn it into a dataframe! # transpose s = set([x for x in [getattr(x, "name", None) for x in it] if x]) # transpose def gen(): # transpose autoInc = k1lib.AutoIncrement(prefix="col_") # transpose while True: # transpose e = autoInc() # transpose if e not in s: yield e # transpose nameGen = gen(); return pd.DataFrame({getattr(e, "name", next(nameGen)):e for e in it}) # transpose except: pass # transpose if self.fill is None: return zip(*it) # transpose else: return itertools.zip_longest(*it, fillvalue=fill) # transpose
[docs] @staticmethod # transpose def fill(fill="", dim1:int=0, dim2:int=1): # transpose """Convenience method to fill in missing elements of a table. Example:: # returns [[1, 2, 3], [4, 5, 0]] [[1, 2, 3], [4, 5]] | transpose.fill(0) | deref() # also returns [[1, 2, 3], [4, 5, 0]], demonstrating how it works underneath [[1, 2, 3], [4, 5]] | transpose(fill=0) | transpose(fill=0) | deref()""" # transpose return transpose(dim1, dim2, fill=fill) | transpose(dim1, dim2, fill=fill) # transpose
[docs] @staticmethod # transpose def wrap(f, dim1:int=0, dim2:int=1, fill=None): # transpose """Wraps ``f`` around 2 :class:`transpose`, can be useful in combination with :class:`k1lib.cli.init.mtmS`. Example:: # returns [[1, 4, 3, 4], [8, 81, 10, 11]] [range(1, 5), range(8, 12)] | transpose.wrap(mtmS.f(apply(op()**2), 1)) | deref() # also returns [[1, 4, 3, 4], [8, 81, 10, 11]], demonstrating the typical way to do this [range(1, 5), range(8, 12)] | apply(op()**2, 1) | deref() The example given is sort of to demonstrate this only. Most of the time, just use :class:`~k1lib.cli.modifier.apply` with columns instead. But sometimes you need direct access to a column, so this is how you can do it.""" # transpose if not isinstance(f, BaseCli): f = cli.applyS(f) # transpose return transpose(dim1, dim2, fill) | f | transpose(dim1, dim2, fill) # transpose
def _jsF(self, meta): # transpose fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # transpose if self.d1 != 0 or self.d2 != 1: raise Exception(f"transpose._jsF() doesn't allow complex transpose across many dimensions yet") # transpose return f"{fIdx} = ({dataIdx}) => {dataIdx}.transpose({cli.kjs.v(self.fill)})", fIdx # transpose
#tOpt.clearPasses() # transpose def oTranspose1(cs, ts, metadata): # `transpose() | transpose().all()` to `transpose() | apply(aS(torch.stack) | transpose())` # oTranspose1 tr, ap = cs; t = ts[0] # oTranspose1 if (not ap.normal) or (not tr.normal): return None # oTranspose1 if (not isinstance(ap.f, transpose)) or (not ap.f.normal): return None # oTranspose1 if isinstance(t, tListIterSet) and (isinstance(t.child, tTensor) or isinstance(t.child, tCollection) and isinstance(tLowest(*t.child.children), tTensor)): # oTranspose1 return [transpose(), cli.apply(cli.aS(torch.stack) | transpose())] # oTranspose1 tOpt.addPass(oTranspose1, [transpose, cli.apply]) # oTranspose1 T = transpose # oTranspose1 def _formStructure(it, dims, dimI): # _formStructure if dimI >= len(dims): return next(it) # _formStructure return [_formStructure(it, dims, dimI+1) for i in range(dims[dimI])] # _formStructure
[docs]class reshape(BaseCli): # reshape
[docs] def __init__(self, *dims): # reshape """Reshapes the input stream into the desired shape. Example:: # returns [[0, 1, 2], [3, 4, 5]] range(6) | reshape(2, 3) | deref() # returns [[0, 1], [2, 3], [4, 5]] range(6) | reshape(3, 2) | deref() # returns [[0, 1], [2, 3], [4, 5]], stopped early range(100) | reshape(3, 2) | deref() # returns [[0, 1, 2], [3, 4, 5]], can leave out first dimension range(6) | reshape(-1, 3) | deref() # returns [[0, 1, 2]], won't include 2nd element, as it ran out of elements range(5) | reshape(-1, 3) | deref() # throws error, as it ran out of elements and can't fulfill the request range(6) | reshape(3, 3) | deref() Unlike :meth:`torch.reshape`, the input piped into this has to be a simple iterator. If you have a complex data structure with multiple dimensions, turn that into a simple iterator with :class:`joinStreams` first, like this:: # returns [[[0, 1, 2]], [[3, 4, 5]]] [[[0], [1]], [[2], [3]], [[4], [5]]] | joinStreams(2) | reshape(2, 1, 3) | deref()""" # reshape self.dims = dims # reshape
[docs] def __ror__(self, it): # reshape it = iter(init.dfGuard(it)) # reshape if self.dims[0] == -1: # reshape try: # reshape while True: yield _formStructure(it, self.dims, 1) # reshape except StopIteration: pass # reshape else: # reshape for i in range(self.dims[0]): yield _formStructure(it, self.dims, 1) # reshape
[docs]class insert(BaseCli): # insert
[docs] def __init__(self, element, begin=True): # insert """Join element into list. Example:: # returns [5, 2, 6, 8] [2, 6, 8] | insert(5) | deref() # returns [2, 6, 8, 5] [2, 6, 8] | insert(5, begin=False) | deref() # returns [[3, 1], 2, 6, 8] [2, 6, 8] | insert([3, 1]) | deref() :param element: the element to insert""" # insert super().__init__(); self.element = element; self.begin = begin; self.expand = False # insert
def _all_array_opt(self, it, level): # insert element = self.element # insert if (it | cli.shape())[level+1:] != (element | cli.shape()): return NotImplemented # insert e = (element | (cli.toNdArray(it.dtype) if isinstance(it, np.ndarray) else cli.toTensor(it.dtype)))[(*[None]*(level+1),)] # insert for i in range(level): e = e | cli.repeatFrom(it.shape[i]).all(i) # insert return (np if isinstance(it, np.ndarray) else torch).concatenate([e, it] if self.begin else [it, e], level) # insert
[docs] def __ror__(self, it): # insert element = self.element; begin = self.begin # insert if isinstance(it, settings.arrayTypes): # insert if (it | cli.shape())[1:] == (element | cli.shape()): # insert if isinstance(it, np.ndarray): # insert b = (element | cli.toNdArray())[None] # insert return np.concatenate([b, it] if begin else [it, b], axis=0) # insert else: # insert b = (element | cli.toTensor())[None] # insert return torch.concatenate([b, it] if begin else [it, b], axis=0) # insert if hasPandas and isinstance(it, pd.core.frame.DataFrame): # insert try: # insert newE = pd.DataFrame({c:[e] for c,e in zip(list(it), self.element)}) # insert return pd.concat([newE, it] if self.begin else [it, newE], ignore_index=True) # insert except: it = init.dfGuard(it) # insert if hasPandas and isinstance(it, pd.core.frame.Series): # insert try: return pd.concat([pd.Series([self.element]), it], ignore_index=True) # insert except: pass # insert try: it[:]; len(it); return [self.element, *it] if begin else [*it, self.element] # if input is sliceable, then it must mean it's the output of some process that's not very heavy, so let's return a list, instead of going through an interator # insert except: pass # insert def gen(): # insert _it = iter(it) # insert if begin: yield element; yield from _it # insert else: yield from _it; yield element # insert return gen() # insert
def _jsF(self, meta): # insert fIdx = init._jsFAuto(); elemIdx = init._jsDAuto(); dataIdx = init._jsDAuto() # insert return f"{elemIdx} = {json.dumps(self.element)}; {fIdx} = ({dataIdx}) => {dataIdx}.insert({elemIdx}, {cli.kjs.v(self.begin)})", fIdx # insert
[docs]class splitW(BaseCli): # splitW
[docs] def __init__(self, *weights:List[float]): # splitW """Splits elements into multiple weighted lists. If no weights are provided, then automatically defaults to [0.8, 0.2]. Example:: # returns [[0, 1, 2, 3, 4, 5, 6, 7], [8, 9]] range(10) | splitW(0.8, 0.2) | deref() # same as the above range(10) | splitW() | deref() This also works with array types:: torch.randn(100, 3) | splitW() # returns 2 tensors with shapes (80, 3) and (20, 3) See also: :class:`splitC`""" # splitW super().__init__(); # splitW if len(weights) == 0: weights = [0.8, 0.2] # splitW self.weights = np.array(weights) # splitW
[docs] def __ror__(self, it): # splitW try: it[:]; len(it) # splitW except: it = list(it) # splitW ws = self.weights; c = 0; ws = (ws * len(it) / ws.sum()).astype(int) # splitW if isinstance(it, settings.arrayTypes) or (hasPandas and isinstance(it, pd.core.arraylike.OpsMixin)): # splitW ans = [] # splitW for w in ws[:-1]: ans.append(it[c:c+w]); c += w # splitW ans.append(it[c:]); return ans # splitW def gen(): # splitW c = 0 # splitW for w in ws[:-1]: yield it[c:c+w]; c += w # splitW yield it[c:] # splitW return gen() # splitW
def _jsF(self, meta): # splitW fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # splitW return f"{fIdx} = ({dataIdx}) => {dataIdx}.splitW({cli.kjs.vs(self.weights) | cli.join(', ')})", fIdx # splitW
[docs]class splitC(BaseCli): # splitC
[docs] def __init__(self, *checkpoints:List[float]): # splitC """Splits elements into multiple checkpoint-delimited lists. Example:: # returns [[0, 1], [2, 3, 4], [5, 6, 7, 8, 9]] range(10) | splitC(2, 5) | deref() # returns ['01', '234', '56789'] "0123456789" | splitC(2, 5) | deref() Here, you're specifying 2 checkpoints, 2 and 5, so it will split the list up into 3 sections. First section is 0-2, second section is 2-5, third section is 5-end. You can pass in fractional checkpoints too:: # returns [[0, 1], [2, 3, 4, 5], [6, 7, 8, 9]] range(10) | splitC(0.2, 0.6) | deref() This cli might be unintuitive to remember, so if you want to just split it up into 2 parts, check out :meth:`~k1lib.cli.filt.head.split`. If you want to split things up by weighted length, check out :class:`splitW`""" # splitC self.checkpoints = checkpoints | cli.aS(np.array) # splitC self.intMode = checkpoints | cli.apply(lambda x: int(x) == x) | cli.aS(all) # splitC
[docs] def __ror__(self, it): # splitC try: it[:]; len(it) # splitC except: it = list(it) # splitC cs = self.checkpoints # splitC if not self.intMode: cs = (cs * len(it)).astype(int) # splitC cs = sorted(cs) # splitC if isinstance(it, settings.arrayTypes) or (hasPandas and isinstance(it, pd.core.arraylike.OpsMixin)): # splitC ans = [it[:cs[0]]] # splitC for i in range(len(cs)-1): ans.append(it[cs[i]:cs[i+1]]) # splitC ans.append(it[cs[-1]:]); return ans # splitC def gen(): # splitC yield it[:cs[0]] # splitC for i in range(len(cs)-1): yield it[cs[i]:cs[i+1]] # splitC yield it[cs[-1]:] # splitC return gen() # splitC
def _jsF(self, meta): # splitC fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # splitC return f"{fIdx} = ({dataIdx}) => {dataIdx}.splitC({cli.kjs.vs(self.checkpoints) | cli.join(', ')})", fIdx # splitC
[docs]class joinStreams(BaseCli): # joinStreams
[docs] def __init__(self, dims=1): # joinStreams """Joins multiple streams. Example:: # returns [1, 2, 3, 4, 5] [[1, 2, 3], [4, 5]] | joinStreams() | deref() # returns [[0, 1], [2], [3, 4, 5], [6, 7, 8], [], [9, 10]] [[[0, 1], [2], [3, 4, 5]], [[6, 7, 8], [], [9, 10]]] | joinStreams() | deref() # returns [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] [[[0, 1], [2], [3, 4, 5]], [[6, 7, 8], [], [9, 10]]] | joinStreams(2) | deref() If you pass in :class:`numpy.ndarray` or :class:`torch.Tensor`, then it will automatically use the C-accelerated version, like this:: # returns Tensor with shape (6, 4) torch.randn(2, 3, 4) | joinStreams() # returns array with shape (6, 4) np.random.randn(2, 3, 4) | joinStreams() Sometimes, you may want to impose some dimensional structure after joining all streams together, which :class:`reshape` does. If "joinStreams" is too unintuitive to remember, there's also an alias called :class:`joinSt`. :param dims: how many ``joinStreams()`` do you want to do consecutively?""" # joinStreams if dims < 0: raise AttributeError(f"`dims` ({dims}) can't be less than 0, as it doesn't make any sense!") # joinStreams self.dims = dims; self.multi = cli.serial(*(joinStreams() for d in range(dims))) if dims > 1 else None # joinStreams
def _all_array_opt(self, it, level:int): # joinStreams if self.dims == 0: return it # joinStreams sh = tuple(it.shape) # joinStreams if len(sh) < level+self.dims+1: raise init.ArrayOptException(f"joinSt({self.dims}).all({level}) can't be applied to an array of shape {sh}") # joinStreams return it.reshape([*sh[:level], sh[level:level+self.dims+1] | cli.toProd(), *sh[level+self.dims+1:]]) # joinStreams
[docs] def __ror__(self, streams:Iterator[Iterator[Any]]) -> Iterator[Any]: # joinStreams if isinstance(streams, settings.arrayTypes): return self._all_array_opt(streams, 0) # joinStreams if self.multi != None: return streams | self.multi # joinStreams elif self.dims == 1: # joinStreams def gen(): # joinStreams for stream in init.dfGuard(streams): yield from stream # joinStreams return gen() # joinStreams else: return streams # joinStreams
def _jsF(self, meta): # joinStreams fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # joinStreams return f"{fIdx} = ({dataIdx}) => {dataIdx}.joinSt({cli.kjs.v(self.dims)})", fIdx # joinStreams
joinSt = joinStreams # joinStreams def probScale(ps, t): # t from 0 -> 1, for typical usage # probScale l = np.log(ps); avg = l.mean() # probScale a = (l-avg)*t+avg; a -= a.max() # probScale ans = np.exp(a); return ans/ans.sum() # probScale import random # probScale def rand(n, ps=None): # rand if ps is None: # rand while True: yield random.randrange(n) # rand else: # rand while True: yield from np.random.choice(n, size=100, p=ps) # rand
[docs]class joinStreamsRandom(BaseCli): # joinStreamsRandom
[docs] def __init__(self, alpha=0, ps=None): # joinStreamsRandom """Join multiple streams randomly. If any streams runs out, then quits. If any stream yields :data:`~k1lib.cli.init.yieldT`, then just ignores that result and continue. Could be useful in active learning. Example:: # could return [0, 1, 10, 2, 11, 12, 13, ...], with max length 20, typical length 18 [range(0, 10), range(10, 20)] | joinStreamsRandom() | deref() stream2 = [[-5, yieldT, -4, -3], yieldT | repeat()] | joinStreams() # could return [-5, -4, 0, -3, 1, 2, 3, 4, 5, 6], demonstrating yieldT [range(7), stream2] | joinStreamsRandom() | deref() By default, all streams are treated equally, and are yielded with equal probabilities. However, you can tweak these probabilities a little bit, to your liking. This is controlled by the parameter ``alpha``: .. image:: ../images/probScale.png If ``alpha`` is 0, then all probabilities will be the same. If ``alpha`` is 1, then all probabilities are proportional to the length of the input stream. The original intention was to vary ``alpha`` just from 0 to 1, but it can actually be of any number:: [range(0, 10), range(10, 100)] | joinStreamsRandom(0) | shape(0) # returns around 21, because it favors both streams equally [range(0, 10), range(10, 100)] | joinStreamsRandom(1) | shape(0) # returns around 90, because it favors the second array 9x more [range(0, 10), range(10, 100)] | joinStreamsRandom(100) | shape(0) # returns 90, because it highly favors the second array [range(0, 10), range(10, 100)] | joinStreamsRandom(-100) | shape(0) # returns 10, because it highly favors the first array :param alpha: if not zero, does a weighted joining, instead of totally uniform probability :param ps: if specified, use these probabilities, else try to determine from the lengths of the input streams""" # joinStreamsRandom super().__init__(); self.alpha = alpha; self.ps = ps # joinStreamsRandom
[docs] def __ror__(self, streams:Iterator[Iterator[Any]]) -> Iterator[Any]: # joinStreamsRandom alpha = self.alpha; ps = self.ps; streams = list(init.dfGuard(streams)); nStreams = len(streams) # joinStreamsRandom if alpha != 0: # joinStreamsRandom if ps is None: # joinStreamsRandom try: ps = np.array([len(st) for st in streams]) # joinStreamsRandom except: # joinStreamsRandom streams = [list(st) for st in streams] # joinStreamsRandom ps = np.array([len(st) for st in streams]) # joinStreamsRandom else: ps = np.array(list(ps))*1.0 # joinStreamsRandom else: ps = np.array([1/nStreams]*nStreams) # joinStreamsRandom ps = probScale(ps/ps.sum(), alpha) # joinStreamsRandom streams = [iter(st) for st in streams] # joinStreamsRandom try: # joinStreamsRandom for streamIdx in rand(len(streams), ps): # joinStreamsRandom o = next(streams[streamIdx]) # joinStreamsRandom if not o is yieldT: yield o # "not is" to fix numpy `==` # joinStreamsRandom except StopIteration: pass # joinStreamsRandom
def _jsF(self, meta): # joinStreamsRandom fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # joinStreamsRandom if self.alpha != 0 and not self.ps is None: raise Exception("joinStreamsRandom._jsF() doesn't allow custom alpha and ps yet") # joinStreamsRandom return f"{fIdx} = ({dataIdx}) => {dataIdx}.joinStreamsRandom()", fIdx # joinStreamsRandom
[docs]class activeSamples(BaseCli): # activeSamples
[docs] def __init__(self, limit:int=100, p:float=0.95): # activeSamples """Yields active learning samples. Example:: o = activeSamples() ds = range(10) # normal dataset ds = [o, ds] | joinStreamsRandom() # dataset with active learning capability next(ds) # returns 0 next(ds) # returns 1 next(ds) # returns 2 o.append(20) next(ds) # can return 3 or 20 next(ds) # can return (4 or 20) or 4 So the point of this is to be a generator of samples. You can define your dataset as a mix of active learning samples and standard samples. Whenever there's a data point that you want to focus on, you can add it to ``o`` and it will eventially yield it. .. warning:: It might not be a good idea to set param ``limit`` to higher numbers than 100. This is because, the network might still not understand a wrong sample after being shown multiple times, and will keep adding that wrong sample back in, distracting it from other samples, and reduce network's accuracy after removing active learning from it. If ``limit`` is low enough (from my testing, 30-100 should be fine), then old wrong samples will be kicked out, allowing for a fresh stream of wrong samples coming in, and preventing the problem above. If you found that removing active learning makes the accuracy drops dramatically, then try decreasing the limit. :param limit: max number of active samples. Discards samples if number of samples is over this. :param p: probability of actually adding the samples in""" # activeSamples super().__init__(); self.p = p # activeSamples self.samples = deque([], limit) # activeSamples
[docs] def append(self, item): # activeSamples """Adds 1 sample.""" # activeSamples if random.random() < self.p: self.samples.append(item) # activeSamples
[docs] def extend(self, items): # activeSamples """Adds multiple samples.""" # activeSamples for item in items: self.append(item) # activeSamples
def __iter__(self): # activeSamples samples = self.samples # activeSamples while True: # activeSamples if len(samples) == 0: yield yieldT # activeSamples else: yield samples.popleft() # activeSamples
[docs]def table(delim:str=None): # table """Basically ``op().split(delim).all()``. This exists because this is used quite a lot in bioinformatics. Example:: # returns [['a', 'bd'], ['1', '2', '3']] ["a|bd", "1|2|3"] | table("|") | deref()""" # table return cli.op().split(patchDefaultDelim(delim)).all() # table
def _batch(it, bs, includeLast, incl): # _batch l = []; it = iter(it); sentinel = object(); last = sentinel # _batch try: # _batch if incl: # _batch while True: # _batch if last is not sentinel: l.append(last) # _batch for i in range(bs): l.append(next(it)) # _batch last = next(it); l.append(last) # _batch yield l; l = [] # _batch else: # _batch while True: # _batch for i in range(bs): l.append(next(it)) # _batch yield l; l = [] # _batch pass # _batch except StopIteration: # _batch if includeLast and len(l) > 0: yield l # _batch def _batchRange(it, bs, includeLast, incl): # _batchRange start, stop, step = it.start, it.stop, it.step # _batchRange lastCur = start; cur = lastCur + bs*step # _batchRange if incl: # _batchRange while cur <= stop: yield range(lastCur, min(cur+1, stop), step); lastCur = cur; cur += bs*step # _batchRange else: # _batchRange while cur <= stop: yield range(lastCur, cur, step); lastCur = cur; cur += bs*step # _batchRange if includeLast and lastCur < stop: yield range(lastCur, stop, step) # _batchRange def _batchSliceable(it, bs, includeLast, incl): # _batchSliceable cur = 0; n = len(it) # _batchSliceable if incl: # _batchSliceable while (cur+1)*bs <= n: yield it[cur*bs:(cur+1)*bs+1]; cur += 1 # _batchSliceable else: # _batchSliceable while (cur+1)*bs <= n: yield it[cur*bs:(cur+1)*bs]; cur += 1 # _batchSliceable if includeLast: # _batchSliceable ans = it[cur*bs:(cur+1)*bs] # _batchSliceable if len(ans) > 0: yield ans # _batchSliceable
[docs]class batched(BaseCli): # batched
[docs] def __init__(self, bs=32, includeLast=False, incl=False): # batched """Batches the input stream. Example:: # returns [[0, 1, 2], [3, 4, 5], [6, 7, 8]] range(11) | batched(3) | deref() # returns [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]] range(11) | batched(3, True) | deref() # returns [[0, 1, 2, 3, 4]] range(5) | batched(float("inf"), True) | deref() # returns [] range(5) | batched(float("inf"), False) | deref() # returns [[0, 1, 2, 3], [3, 4, 5, 6], [6, 7, 8, 9]], includes the first element of the next batch! range(11) | batched(3, incl=True) | deref() # returns [[0, 1, 2, 3], [3, 4, 5, 6], [6, 7, 8, 9], [9, 10]] range(11) | batched(3, True, incl=True) | deref() Can work well and fast with :class:`torch.Tensor` and :class:`numpy.ndarray`:: # both returns torch.Tensor of shape (2, 3, 4, 5) torch.randn(6, 4, 5) | batched(3) torch.randn(7, 4, 5) | batched(3) Also, if input is a :class:`range`, then to save time, a bunch of other ranges will be returned, instead of a bunch of lists, for performance:: # returns [range(0, 3), range(3, 6), range(6, 9)] range(11) | batched(3) | toList() See also: :class:`window`, :class:`batchedTrigger` :param bs: batch size :param includeLast: whether to include the last incomplete batch or not :param incl: whether to have inclusive bounds or not""" # batched super().__init__(); self.bs = bs; self.includeLast = includeLast; self.incl = incl # batched
def _all_array_opt(self, it, level): # batched if self.includeLast: return NotImplemented # batched bs = self.bs; a = [slice(None, None, None)]*level; n = it.shape[level] // bs; it = it[(*a, slice(None, n*bs))] # batched return it.reshape(*it.shape[:level], n, bs, *it.shape[level+1:]) # batched
[docs] def __ror__(self, it): # batched bs = self.bs; includeLast = self.includeLast; incl = self.incl # batched if bs == float("inf"): return [it] if includeLast else [] # batched if not incl and isinstance(it, k1lib.settings.cli.arrayTypes): # batched if (not includeLast) or (it.shape[0]%bs == 0): n = it.shape[0] // bs; it = it[:n*bs]; return it.reshape(n, bs, *it.shape[1:]) # batched if not incl and isinstance(it, range): return _batchRange(it, bs, includeLast, incl) # batched try: it[:]; len(it); return _batchSliceable(it, bs, includeLast, incl) # batched except: return _batch(it, bs, includeLast, incl) # batched
def _jsF(self, meta): # batched fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # batched return f"{fIdx} = ({dataIdx}) => {dataIdx}.batched({cli.kjs.v(self.bs)}, {cli.kjs.v(self.includeLast)})", fIdx # batched
[docs]class batchedTrigger(BaseCli): # batched the data using a trigger column # batchedTrigger
[docs] def __init__(self, col:int=None, value:Any=None, delta:float=None, adj:bool=True): # value is to only yield segments that have the specified value # batchedTrigger """Like :class:`batched`, will batch rows/elements up but based on a trigger value instead. See :class:`~k1lib.cli.filt.trigger` for more discussion. Normal mode:: data = [[1, "a"], [1, "b"], [2, "c"], [3, "d"], [3, "e"], [1, "f"]] # returns [[1, 1], [2], [3, 3], [1]] [1, 1, 2, 3, 3, 1] | batchedTrigger() | deref() # throws error, does not make sense to invert search. Can only invert if .value is specified [1, 1, 2, 3, 3, 1] | ~batchedTrigger() | deref() # returns [[[1, 'a'], [1, 'b']], [[2, 'c']], [[3, 'd'], [3, 'e']], [[1, 'f']]] data | batchedTrigger(0) | deref() In this mode, elements of the specified column that have the same value consecutively will be grouped together into a batch. Note the difference between this and :class:`groupBy`. Specific-value mode:: # returns [[1, 1], [1]] [1, 1, 2, 3, 3, 1] | batchedTrigger(value=1) | deref() # returns [[2], [3, 3]], essentially only yield the batches that don't match the specified value [1, 1, 2, 3, 3, 1] | ~batchedTrigger(value=1) | deref() # returns [[[1, 'a'], [1, 'b']], [[1, 'f']]] data | batchedTrigger(0, value=1) | deref() This mode is pretty much the same as previously, but will only return the batches whose specified columns match (or not match) a specific value. Finally, the delta mode:: # returns [[1, 1.1, 1.2], [2], [3], [3.55, 3.8, 3.9, 4.1], [6], [10]] [1, 1.1, 1.2, 2, 3, 3.55, 3.8, 3.9, 4.1, 6, 10] | batchedTrigger(delta=0.5) | deref() # returns [[1, 1.1, 1.2], [2], [3], [3.55, 3.8, 3.9], [4.1], [6], [10]] [1, 1.1, 1.2, 2, 3, 3.55, 3.8, 3.9, 4.1, 6, 10] | batchedTrigger(delta=0.5, adj=False) | deref() # .col also works here, but it's too long to fit here In contrast to the previous 2 modes, this does not rely on exact values. Instead, 2 consecutive elements are considered to be in the same batch if their difference is less than the provided ``delta`` value. Within delta mode, there's the parameter .adj as well. If this is true, then it will measure the delta between the last (3.9) and current element (4.1), while if .adj=False, then it will measure the delta between the first element in the group (3.55) and the current element (4.1, which exceeds delta, which splits into a new batched). See also: :class:`~k1lib.cli.filt.trigger` :param col: column of the trigger value. Can be None, int, or list[int] :param value: if trigger value equals to this, then yields the batch :param delta: if trigger value changes more than this, then yields the batch :param adj: "adjacent", only makes sense if ``delta != None``. See example above""" # batchedTrigger self.col = col; self.value = value; self.delta = delta; self.adj = adj; self.inverted = False # batchedTrigger if value is not None and delta is not None: raise Exception(f"Can't specify precise value and fuzzy values (by specifying delta) at the same time. Does not make sense! Leave either .value or .delta empty") # batchedTrigger
[docs] def __invert__(self): # batchedTrigger if self.value is None: raise Exception("batchedTrigger().value is None! Inverting it would not make sense!") # batchedTrigger res = batchedTrigger(self.col, self.value); res.inverted = not self.inverted; return res # batchedTrigger
[docs] def __ror__(self, it): # batchedTrigger col = self.col; value = self.value; delta = self.delta; adj = self.adj; inv = self.inverted # batchedTrigger it = init.dfGuard(it); arr = []; lastE = object() # sentinel # batchedTrigger # also, the reason for so many if statements because I don't want to have the ifs inside the for loops, cause that's gonna be slow # batchedTrigger if value is None and delta is None: # normal mode # batchedTrigger if col is None: # batchedTrigger for e in it: # batchedTrigger if e == lastE: arr.append(e) # batchedTrigger else: # batchedTrigger if len(arr): yield arr # batchedTrigger arr = [e] # batchedTrigger lastE = e # batchedTrigger elif isinstance(col, int): # batchedTrigger for row in it: # batchedTrigger e = row[col] # batchedTrigger if e == lastE: arr.append(row) # batchedTrigger else: # batchedTrigger if len(arr): yield arr # batchedTrigger arr = [row] # batchedTrigger lastE = e # batchedTrigger else: # batchedTrigger for row in it: # batchedTrigger e = tuple([row[x] for x in col]) # batchedTrigger if e == lastE: arr.append(row) # batchedTrigger else: # batchedTrigger if len(arr): yield arr # batchedTrigger arr = [row] # batchedTrigger lastE = e # batchedTrigger if len(arr): yield arr # batchedTrigger elif value is not None: # normal mode, but only returns specified value # batchedTrigger if col is None: # batchedTrigger for e in it: # batchedTrigger if e == lastE: arr.append(e) # batchedTrigger else: # batchedTrigger if len(arr) and (lastE == value) ^ inv: yield arr # batchedTrigger arr = [e] # batchedTrigger lastE = e # batchedTrigger elif isinstance(col, int): # batchedTrigger for row in it: # batchedTrigger e = row[col] # batchedTrigger if e == lastE: arr.append(row) # batchedTrigger else: # batchedTrigger if len(arr) and (lastE == value) ^ inv: yield arr # batchedTrigger arr = [row] # batchedTrigger lastE = e # batchedTrigger else: # batchedTrigger value = tuple(value) # batchedTrigger for row in it: # batchedTrigger e = tuple([row[x] for x in col]) # batchedTrigger if e == lastE: arr.append(row) # batchedTrigger else: # batchedTrigger if len(arr) and (lastE == value) ^ inv: yield arr # batchedTrigger arr = [row] # batchedTrigger lastE = e # batchedTrigger if len(arr) and (lastE == value) ^ inv: yield arr # batchedTrigger elif delta is not None: # delta mode # batchedTrigger lastE = float("-inf"); arr = [] # batchedTrigger if adj: # batchedTrigger if col is None: # batchedTrigger for e in it: # batchedTrigger if abs(e-lastE) < delta: arr.append(e) # batchedTrigger else: # batchedTrigger if len(arr): yield arr # batchedTrigger arr = [e] # batchedTrigger lastE = e # batchedTrigger elif isinstance(col, int): # batchedTrigger for row in it: # batchedTrigger e = row[col] # batchedTrigger if abs(e-lastE) < delta: arr.append(row) # batchedTrigger else: # batchedTrigger if len(arr): yield arr # batchedTrigger arr = [row] # batchedTrigger lastE = e # batchedTrigger else: raise Exception("Multiple columns not supported in delta mode, as it doesn't make any sense") # batchedTrigger else: # batchedTrigger if col is None: # batchedTrigger for e in it: # batchedTrigger if abs(e-lastE) < delta: arr.append(e) # batchedTrigger else: # batchedTrigger if len(arr): yield arr # batchedTrigger arr = [e]; lastE = e # batchedTrigger elif isinstance(col, int): # batchedTrigger for row in it: # batchedTrigger e = row[col] # batchedTrigger if abs(e-lastE) < delta: arr.append(row) # batchedTrigger else: # batchedTrigger if len(arr): yield arr # batchedTrigger arr = [row]; lastE = e # batchedTrigger else: raise Exception("Multiple columns not supported in delta mode, as it doesn't make any sense") # batchedTrigger if len(arr): yield arr # batchedTrigger else: raise Exception("Unreachable") # batchedTrigger
nothing = object() # batchedTrigger
[docs]class window(BaseCli): # window
[docs] def __init__(self, n, newList=False, pad=nothing): # window """Slides window of size n forward and yields the windows. Example:: # returns [[0, 1, 2], [1, 2, 3], [2, 3, 4]] range(5) | window(3) | deref() # returns [[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4, None], [4, None, None]] range(5) | window(3, pad=None) | deref() If you are doing strange transformations to the result, like transposing it, then it might complain that the internal deque (double-ended queue) mutated during iteration. In that case, then set ``newList`` to True. It's not True by default because multiple lists will be created, all of which needs memory allocation, which will be slower:: # takes 15ms range(100000) | window(100) | ignore() # takes 48ms, because of allocating lists range(100000) | window(100) | ignore() See also: :class:`~batched` :param n: size of the window :param newList: whether to create a new list out of every window or not. If False (default), less robust but faster. If True, more robust but slower :param pad: whether to pad the output stream on the end, so that it has the same number of elements as the input stream or not""" # window self.n = n; self.listF = (lambda x: list(x)) if newList else (lambda x: iter(x)) # window self.pad = pad; self.padBool = pad is not nothing # why do this? Cause in applyMp, "nothing" takes on multiple identities # window
def _all_array_opt(self, it, level): # window n = it.shape[level]; w = self.n; h = n - w + 1 # window i = np.arange(h)[:,None] + np.arange(w)[None] # window return it[(*[slice(None, None, None)]*level,i)] # window
[docs] def __ror__(self, it): # window if not self.padBool and isinstance(it, settings.arrayTypes): return self._all_array_opt(it, 0) # window def gen(): # window n = self.n; pad = self.pad; q = deque([], n); listF = self.listF # window for e in init.dfGuard(it): # window q.append(e) # window if len(q) == n: yield listF(q); q.popleft() # window if self.padBool: # window for i in range(n-1): q.append(pad); yield listF(q); q.popleft() # window return gen() # window
def _jsF(self, meta): # window fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # window if self.padBool: raise Exception("window._jsF() doesn't support custom pad value yet") # window return f"{fIdx} = ({dataIdx}) => {dataIdx}.window({cli.kjs.v(self.n)})", fIdx # window
[docs]class groupBy(BaseCli): # groupBy
[docs] def __init__(self, column:int, separate:bool=False, removeCol:bool=None): # groupBy """Groups table by some column. Example:: a = [[2.3, 5], [3.4, 2], [4.5, 2], [5.6, 5], [6.7, 1]] a | groupBy(1) | deref() This returns:: [[[6.7, 1]], [[3.4, 2], [4.5, 2]], [[2.3, 5], [5.6, 5]]] Should have O(n log(n)) time complexity. What if ``separate`` is True:: a | groupBy(1, True) | deref() This returns:: [[1, [[6.7]]], [2, [[3.4], [4.5]]], [5, [[2.3], [5.6]]]] What if ``removeCol`` is False:: a | groupBy(1, True, False) | deref() This returns:: [[1, [[6.7, 1]]], [2, [[3.4, 2], [4.5, 2]]], [5, [[2.3, 5], [5.6, 5]]]] There's another perspective and way to think about this operation. A lot of libraries (like pandas) expect the uncompressed, "flat" version (the variable ``a`` in the examples above). But throughout my time using cli, the grouped version (separate=True) is usually much more useful and amenable to transformations. It also occupies less memory too, as the columns with duplicated elements are deleted. So, you can sort of think :class:`groupBy` is converting pandas dataframes into a more easily digestible form. But because the prevalence of those libraries, after doing all the transformations you want, sometimes it's necessary to flatten it again, which :class:`ungroup` does. If you want to group text lines by some pattern im them, :class:`~k1lib.cli.grep.grep` with ``sep=True`` might be better for you. :param column: which column to group by :param separate: whether to separate out the column to sort of form a dict or not. See example :param removeCol: whether to remove the grouped-by column. Defaults to True if ``separate=True``, and False if ``separate=False``""" # groupBy self.column = column; self.separate = separate # groupBy if removeCol is None: removeCol = separate # if separate, then remove cols, else don't do it # groupBy self.removeCol = removeCol # groupBy
[docs] def __ror__(self, it): # groupBy it = init.dfGuard(it); c = self.column; separate = self.separate; removeCol = self.removeCol # groupBy if not isinstance(it, settings.arrayTypes): it = [list(e) for e in init.dfGuard(it)] # groupBy it = it | cli.sort(c, False); sentinel = object(); it = iter(it); a = [next(it, sentinel)] # groupBy if a[0] is sentinel: return # groupBy v = a[0][c] # groupBy try: # groupBy while True: # groupBy e = next(it) # groupBy if e[c] == v: a.append(e) # groupBy else: # groupBy if removeCol: a = a | ~cli.cut(c) # groupBy yield [v, a] if separate else a; a = [e]; v = a[0][c] # groupBy except StopIteration: # groupBy if len(a) > 0: # groupBy if removeCol: a = a | ~cli.cut(c) # groupBy yield [v, a] if separate else a # groupBy
def _jsF(self, meta): # groupBy fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # groupBy return f"{fIdx} = ({dataIdx}) => {dataIdx}.groupBy({cli.kjs.v(self.column)}, {cli.kjs.v(self.separate)}, {cli.kjs.v(self.removeCol)})", fIdx # groupBy
[docs]class ungroup(BaseCli): # ungroup
[docs] def __init__(self, single=True, begin=True, insertCol:bool=True): # ungroup """Ungroups things that were grouped using a specific mode of :class:`groupBy`. Particularly useful to transform some complex data structure into a flat dataframe so that you can plug into pandas. Example:: # returns [[3, 1.2], [3, 3.4], [5, 6], [5, 8], [5, 11]] [[3, [1.2, 3.4]], [5, [6, 8, 11]]] | ungroup() | deref() # returns [[3, 1.2], [3, 3.4], [5, 6], [5, 8], [5, 11]] [[3, [[1.2], [3.4]]], [5, [[6], [8], [11]]]] | ungroup(False) | deref() # returns [[1.2, 3], [3.4, 3], [6, 5], [8, 5], [11, 5]] [[3, [1.2, 3.4]], [5, [6, 8, 11]]] | ungroup(begin=False) | deref() :param single: whether the table in each group has a single column or not :param begin: whether to insert the column at the beginning or at the end. Only works if ``insertCol`` is True :param insertCol: whether to insert the column into the table or not""" # ungroup self.single = single; self.begin = begin; self.insertCol = insertCol # ungroup
[docs] def __ror__(self, it): # ungroup preprocess = cli.apply(cli.wrapList().all(), 1) if self.single else cli.iden(); begin = self.begin; it = init.dfGuard(it) # ungroup if self.insertCol: return it | preprocess | ~cli.apply(lambda x, arr: arr | cli.insert(x, begin).all()) | cli.joinStreams() # ungroup else: return it | preprocess | cli.cut(1) | cli.joinStreams() # ungroup
def _jsF(self, meta): # ungroup fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # ungroup return f"{fIdx} = ({dataIdx}) => {dataIdx}.ungroup({cli.kjs.v(self.single)}, {cli.kjs.v(self.begin)}, {cli.kjs.v(self.insertCol)})", fIdx # ungroup
[docs]class insertColumn(BaseCli): # insertColumn
[docs] def __init__(self, column:List[Any], begin=True, fill=""): # insertColumn """Inserts a column at beginning or end. Example:: # returns [['a', 1, 2], ['b', 3, 4]] [[1, 2], [3, 4]] | insertColumn(["a", "b"]) | deref() # returns [[1, 2, 'a'], [3, 4, 'b']] [[1, 2], [3, 4]] | insertColumn(["a", "b"], begin=False) | deref()""" # insertColumn self.column = column; self.begin = begin; self.fill = fill # insertColumn self._f = transpose.wrap(insert(column, begin=begin), fill=fill) # insertColumn
[docs] def __ror__(self, it): return it | self._f # conveniently, because transpose() and insert() has _all_array_opt(), this automatically has that property as well! Super nice! # insertColumn
def _jsF(self, meta): return self._f._jsF(meta) # insertColumn
[docs]class insertIdColumn(BaseCli): # insertIdColumn
[docs] def __init__(self, table=False, begin=True): # insertIdColumn """Inserts an id column at the beginning (or end). Example:: # returns [[0, 'a', 2], [1, 'b', 4]] [["a", 2], ["b", 4]] | insertIdColumn(True) | deref() # returns [[0, 'a'], [1, 'b']] "ab" | insertIdColumn() This has a shorter alias called :class:`insId` :param table: if False, then insert column to an Iterator[str], else treat input as a full fledged table""" # insertIdColumn self.table = table; self.begin = begin # insertIdColumn
def _all_array_opt(self, it, level): # insertIdColumn if self.table: # insertIdColumn if level != len(it.shape)-2: return NotImplemented # insertIdColumn else: # insertIdColumn b = (np.arange(it.shape[level]) | cli.repeat(np.prod(it.shape[:level]))).reshape((*it.shape[:level+1], 1)) # insertIdColumn return (np if isinstance(it, np.ndarray) else torch).concatenate([b, it] if self.begin else [it, b], -1) # insertIdColumn else: # insertIdColumn if level != len(it.shape)-1: return NotImplemented # insertIdColumn else: return it[(*[slice(None,None,None)]*len(it.shape),None)] | insId(True, self.begin).all(level) # insertIdColumn
[docs] def __ror__(self, it): # insertIdColumn # if isinstance(it, settings.arrayTypes): return self._all_array_opt(it, 0) # insertIdColumn if isinstance(it, settings.arrayTypes): # insertIdColumn bm = np if isinstance(it, np.ndarray) else torch # insertIdColumn if self.table: # insertIdColumn if len(it.shape) == 2: # insertIdColumn b = bm.arange(it.shape[0])[:,None] # insertIdColumn return bm.concatenate([b, it] if self.begin else [it, b], 1) # insertIdColumn else: # insertIdColumn if len(it.shape) == 1: # insertIdColumn a = it[:,None]; b = bm.arange(len(it))[:,None] # insertIdColumn return bm.concatenate([b, a] if self.begin else [a, b], 1) # insertIdColumn if self.table: # insertIdColumn if hasPandas and isinstance(it, pd.DataFrame): # insertIdColumn try: return it | T() | insert(range(len(it)), self.begin) | T() # insertIdColumn except: it = init.dfGuard(it) # insertIdColumn if self.begin: return ([i, *e] for i, e in enumerate(it)) # insertIdColumn else: return ([*e, i] for i, e in enumerate(it)) # insertIdColumn else: # insertIdColumn if hasPandas and isinstance(it, pd.Series): # insertIdColumn if not it.name: it.name = "A" # insertIdColumn try: return pd.DataFrame({"_auto_idx": range(len(it)), it.name: it} if self.begin else {it.name: it, "_auto_idx": range(len(it))}) # insertIdColumn except: pass # insertIdColumn if self.begin: return ([i, e] for i, e in enumerate(init.dfGuard(it))) # insertIdColumn else: return ([e, i] for i, e in enumerate(init.dfGuard(it))) # insertIdColumn
def _jsF(self, meta): # insertIdColumn fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # insertIdColumn return f"{fIdx} = ({dataIdx}) => {dataIdx}.insertIdColumn({cli.kjs.v(self.table)}, {cli.kjs.v(self.begin)})", fIdx # insertIdColumn
insId = insertIdColumn # insertIdColumn
[docs]def unsqueeze(dim:int=0): # unsqueeze """Unsqueeze input iterator. Example:: t = [[1, 2], [3, 4], [5, 6]] # returns (3, 2) t | shape() # returns (1, 3, 2) t | unsqueeze(0) | shape() # returns (3, 1, 2) t | unsqueeze(1) | shape() # returns (3, 2, 1) t | unsqueeze(2) | shape() Behind the scenes, it's really just ``wrapList().all(dim)``, but the "unsqueeze" name is a lot more familiar. Also note that the inverse operation "squeeze" is sort of ``item().all(dim)``, if you're sure that this is desirable:: t = [[1, 2], [3, 4], [5, 6]] # returns (3, 2) t | unsqueeze(1) | item().all(1) | shape()""" # unsqueeze return cli.wrapList().all(dim) # unsqueeze
def oUnsqueeze(cs, ts, metadata): # reminder: if change this then also change the example in llvm.rst # oUnsqueeze a = cs[0]; t = ts[0]; i = 0; # oUnsqueeze if not isinstance(t, tArrayTypes): return None # oUnsqueeze while isinstance(a, cli.apply) and a.normal: i += 1; a = a.f # oUnsqueeze if not isinstance(a, cli.wrapList): return None # oUnsqueeze t = t.__class__(t.child, t.rank+1 if t.rank is not None else None) # oUnsqueeze if isinstance(t, tNpArray): return [cli.aS(lambda x: np.expand_dims(x, i)).hint(t)] # oUnsqueeze else: return [cli.aS(lambda x: x.unsqueeze(i)).hint(t)] # oUnsqueeze tOpt.addPass(oUnsqueeze, [cli.apply], 4) # oUnsqueeze
[docs]class count(BaseCli): # count
[docs] def __init__(self, max=False): # count """Finds unique elements and returns a table with [frequency, value, percent] columns. Example:: # returns [[1, 'a', '33%'], [2, 'b', '67%']] ['a', 'b', 'b'] | count() | deref() # returns [[1, 'a', '50%'], [2, 'b', '100%']] ['a', 'b', 'b'] | count(max=True) | deref() :param max: if True, percentages are of the max value, else they're of the sum """ # count super().__init__(); self._max = max # count
def _typehint(self, inp): # count i = tAny() # count if isinstance(inp, tListIterSet): i = inp.child # count return tIter(tCollection(int, i, str)) # count
[docs] def __ror__(self, it:Iterator[str]): # count it = cli.apply(lambda row: (tuple(row) if isinstance(row, list) else row))(init.dfGuard(it)) # count c = Counter(it); s = max(c.values()) if self._max else sum(c.values()) # count return [[v, k, f"{round(100*v/s)}%"] for k, v in c.items()] # has to scan through the entire thing anyway, which is long enough already, so just turn it into a list # count
[docs] @staticmethod # count def join(): # count """Joins multiple counts together. Example:: # returns [[2, 'a', '33%'], [4, 'b', '67%']] ['a', 'b', 'b'] | repeat(2) | applyMp(count() | deref()) | count.join() | deref() This is useful when you want to get the count of a really long list/iterator using multiple cores""" # count def inner(counts): # count values = defaultdict(lambda: 0) # count for _count in counts: # count if _count is None: continue # count for v, k, *_ in _count: # count values[k] += v # count s = values.values() | cli.toSum() # count return [[v, k, f"{round(100*v/s)}%"] for k, v in values.items()] # count return cli.applyS(inner) # count
def _jsF(self, meta): # count fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # count return f"{fIdx} = ({dataIdx}) => {dataIdx}.count()", fIdx # count
[docs]class hist(BaseCli): # hist
[docs] def __init__(self, bins:int=30, dropZero:bool=False): # hist """Bins a long 1d array. Effectively creating a historgram, without actually plotting it. Example:: np.random.randn(1000) | hist(5) That returns something like:: (array([-2.31449761, -1.17406889, -0.03364017, 1.10678854, 2.24721726]), array([ 41, 207, 432, 265, 55]), 1.1404287156493986) This format goes with :meth:`~matplotlib.pyplot.bar` directly like this:: np.random.randn(1000) | hist(10) | ~aS(plt.bar) If you have tons of data that's handled in multiple processes, but you want to get an overall histogram, you can do something like this:: # bad solution, runs slow, accurate fileNames | applyMp(cat() | toFloat() | aS(list)) | joinStreams() | hist() | ~aS(plt.bar) # good solution, runs fast, slightly inaccurate fileNames | applyMp(cat() | toFloat() | hist(300)) | hist.join() | ~aS(plt.bar) Let's say in each process, you have 10M records, and that you have 1000 processes in total. In the first solution, you transfer all records (10B records total) to a single process, then calculate the histogram of them. The transfer overhead is going to be absolutely enourmous, as well as the computation. This really defeats the purpose of doing multiprocessing. In the second solution, you "convert" 10M records into 600 numbers for each process, which scales up to 600k numbers for all processes. Although big, but certainly manageable with current hardware. So the data transfer cost is not a lot at all. The histogram merging part also executes relatively fast, as it only creates an internal array of 3M length. See over :meth:`hist.join` for param details :param bins: how many bins should the histogram has? :param dropZero: if True, will eliminate buckets that has no element in them""" # hist self.bins = bins; self.dropZero = dropZero # hist
[docs] def __ror__(self, it): # hist if hasPandas and isinstance(it, pd.DataFrame): it = init.dfGuard(it) # hist if not (isinstance(it, settings.arrayTypes) or (hasPandas and isinstance(it, pd.core.arraylike.OpsMixin))): it = list(it) # hist y, x = np.histogram(it, bins=self.bins) # hist delta = x[1] - x[0]; x = (x[1:] + x[:-1])/2 # hist if self.dropZero: idx = y > 0; return x[idx], y[idx], delta # hist return x, y, delta # hist
[docs] @staticmethod # hist def join(scale:float=1e4, bins:int=None, log:bool=True, xlog:bool=False): # hist """Joins multiple histograms together. Example:: a = np.random.randn(1000); b = np.random.randn(1000)+3 [a, b] | joinStreams() | hist() | head(2) | ~aS(plt.plot) # ---------------------------------- Ground truth [a, b] | hist(300).all() | hist.join(scale=1e4) | head(2) | ~aS(plt.plot) # ------------------ Log joining [a, b] | hist(300).all() | hist.join(scale=1e4, log=False) | head(2) | ~aS(plt.plot, ".") # -- Linear joining plt.legend(["Ground truth", "Log", "Linear"]); plt.grid(True); plt.ylabel("Frequency"); plt.xlabel("Value"); This results in this: .. image:: ../images/hist1.png As you can see, this process is only approximate, but is accurate enough in everyday use. If you are a normal user, then this is probably enough. However, if you're a mathematician and really care about the accuracy of this, read on. .. admonition:: Performance vs accuracy As mentioned in :class:`hist`, joining histograms from across processes can really speed things up big time. But the joining process is complicated, with multiple parameters and different tradeoffs for each config. In this example, scale = 1e4, bins = 30, OG bins = 300, log = True. "OG bins" is the number of bins coming into :meth:`hist.join` To get the best accuracy possible, you should set scale and OG bins high. If better performance is desired, you should first lower scale, then lower OG bins, then finally lower bins. .. admonition:: Log scale Take a look at this piece of code:: a, b = np.random.randn(1000)*1, np.random.randn(3000000)*0.3+3 [a, b] | joinStreams() | hist() | head(2) | ~aS(plt.plot) [a, b] | hist(300).all() | hist.join(scale=1e4) | head(2) | ~aS(plt.plot) [a, b] | hist(300).all() | hist.join(scale=1e4, log=False) | head(2) | ~aS(plt.plot, ".") plt.yscale("log"); plt.legend(["Ground truth", "Log", "Linear"]); plt.grid(True); plt.ylabel("Frequency"); plt.xlabel("Value"); This results in: .. image:: ../images/hist2.png This shows how log mode is generally better than linear mode when the frequencies span across multiple orders of magnitude. So why not delete linear mode directly? Well, I have not formally proved that log scale case fully covers the linear case, although in practice it seems so. So just to be cautious, let's leave it in .. admonition:: Scale The setup is just like in the "Log scale" section, but with scale = 1e3 instead of the default 1e4: .. image:: ../images/hist3.png Remember that the higher the scale, the more accurate, but also the longer it runs. If the difference in high and low frequencies are bigger than scale, then the low frequency values are dropped. :param scale: how big a range of frequencies do we want to capture? :param bins: output bins. If not specified automatically defaults to 1/10 the original number of bins :param log: whether to transform everything to log scale internally on the y axis :param xlog: whether to transform everything to log scale internally on the x axis""" # hist def inner(it): # hist it = it | (cli.apply(cli.apply(math.log), 0) if xlog else cli.iden()) | cli.deref() # hist _bins = bins if bins is not None else it | cli.cut(0) | cli.shape(0).all() | cli.toMean() | cli.op()/10 | cli.aS(int) # hist maxY = max(it | cli.cut(1) | cli.toMax().all() | cli.toMax() | cli.op()/scale, 1e-9) # hist if log: it = it | cli.apply(lambda x: np.log(x+1e-9)-math.log(maxY) | cli.aS(np.exp) | cli.aS(np.round) | cli.op().astype(int), 1) | cli.deref() # hist else: it = it | cli.apply(lambda y: (y/maxY).astype(int), 1) | cli.deref() # hist x, y, delta = it | cli.cut(0, 1) | cli.transpose().all() | cli.joinStreams() | ~cli.apply(lambda a,b: [a]*b) | cli.joinStreams() | cli.aS(list) | cli.aS(np.array) | hist(_bins) # hist return x | ((cli.apply(math.exp) | cli.deref()) if xlog else cli.iden()), y*maxY, delta # hist return cli.aS(inner) # hist
def _permuteGen(row, pers): # _permuteGen row = list(row); return (row[i] for i in pers) # _permuteGen
[docs]class permute(BaseCli): # permute
[docs] def __init__(self, *permutations:List[int]): # permute """Permutes the columns. Acts kinda like :meth:`torch.Tensor.permute`. Example:: # returns [['b', 'a'], ['d', 'c']] ["ab", "cd"] | permute(1, 0) | deref()""" # permute super().__init__(); self.permutations = permutations # permute
[docs] def __ror__(self, it:Iterator[str]): # permute p = self.permutations # permute for row in it: yield _permuteGen(row, p) # permute
def _jsF(self, meta): # permute fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # permute return f"{fIdx} = ({dataIdx}) => {dataIdx}.permute({cli.kjs.vs(self.permutations) | cli.join(', ')})", fIdx # permute
[docs]class AA_(BaseCli): # AA_
[docs] def __init__(self, *idxs:List[int], wraps=False): # AA_ """Returns 2 streams, one that has the selected element, and the other the rest. Example:: # returns [5, [1, 6, 3, 7]] [1, 5, 6, 3, 7] | AA_(1) # returns [[5, [1, 6, 3, 7]]] [1, 5, 6, 3, 7] | AA_(1, wraps=True) You can also put multiple indexes through:: # returns [[1, [5, 6]], [6, [1, 5]]] [1, 5, 6] | AA_(0, 2) If you don't specify anything, then all indexes will be sliced:: # returns [[1, [5, 6]], [5, [1, 6]], [6, [1, 5]]] [1, 5, 6] | AA_() As for why the strange name, think of this operation as "AĀ". In statistics, say you have a set "A", then "not A" is commonly written as A with an overline "Ā". So "AA\_" represents "AĀ", and that it first returns the selection A. :param wraps: if True, then the first example will return [[5, [1, 6, 3, 7]]] instead, so that A has the same signature as Ā""" # AA_ super().__init__(); self.idxs = idxs; self.wraps = wraps # AA_
[docs] def __ror__(self, it:List[Any]) -> List[List[List[Any]]]: # AA_ super().__ror__(it); idxs = self.idxs; it = list(init.dfGuard(it)) # AA_ if len(idxs) == 0: idxs = range(len(it)) # AA_ def gen(idx): # AA_ return [it[idx], [v for i, v in enumerate(it) if i != idx]] # AA_ if not self.wraps and len(idxs) == 1: return gen(idxs[0]) # AA_ return [gen(idx) for idx in idxs] # AA_
[docs]class peek(BaseCli): # peek
[docs] def __init__(self): # peek """Returns (firstRow, iterator). This sort of peaks at the first row, to potentially gain some insights about the internal formats. The returned iterator is not tampered. Example:: e, it = iter([[1, 2, 3], [1, 2]]) | peek() print(e) # prints "[1, 2, 3]" s = 0 for e in it: s += len(e) print(s) # prints "5", or length of 2 lists You kinda have to be careful about handling the ``firstRow``, because you might inadvertently alter the iterator:: e, it = iter([iter(range(3)), range(4), range(2)]) | peek() e = list(e) # e is [0, 1, 2] list(next(it)) # supposed to be the same as `e`, but is [] instead The example happens because you have already consumed all elements of the first row, and thus there aren't any left when you try to call ``next(it)``.""" # peek super().__init__() # peek
[docs] def __ror__(self, it:Iterator[Any]) -> Tuple[Any, Iterator[Any]]: # peek if isinstance(it, settings.arrayTypes): # peek try: return it[0], it # peek except: return None, [] # peek if hasPandas: # peek if isinstance(it, pd.DataFrame): # peek try: return it[:1].to_numpy()[0], it # peek except: return None, [] # peek if isinstance(it, pd.core.arraylike.OpsMixin): # peek try: return it[0], it # peek except: return None, [] # peek it = iter(it); sentinel = object(); row = next(it, sentinel) # peek if row is sentinel: return None, [] # peek def gen(): yield row; yield from it # peek return row, gen() # peek
[docs]class peekF(BaseCli): # peekF
[docs] def __init__(self, f:Union[BaseCli, Callable[[Any], Any]]): # peekF r"""Similar to :class:`peek`, but will execute ``f(row)`` and return the input Iterator, which is not tampered. Example:: it = lambda: iter([[1, 2, 3], [1, 2]]) # prints "[1, 2, 3]" and returns [[1, 2, 3], [1, 2]] it() | peekF(lambda x: print(x)) | deref() # prints "1\n2\n3" it() | peekF(headOut()) | deref()""" # peekF super().__init__(fs=[f]); self.f = f # peekF
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # peekF f = self.f # peekF if isinstance(it, settings.arrayTypes): # peekF try: it[0]; f(it[0]); return it # peekF except: return [] # peekF if hasPandas: # peekF if isinstance(it, pd.DataFrame): # peekF try: a = it[:1].to_numpy()[0]; f(a); return it # peekF except: return [] # peekF if isinstance(it, pd.core.arraylike.OpsMixin): # peekF try: it[0]; f(it[0]); return it # peekF except: return None, [] # peekF it = iter(it); sentinel = object(); row = next(it, sentinel) # peekF if row is sentinel: return [] # peekF def gen(): yield row; yield from it # peekF f(row); return gen() # peekF
settings.add("repeat", k1lib.Settings().add("infBs", 100, "if dealing with infinite lists, how many elements at a time should be processed?"), "settings related to repeat() and repeatFrom()") # peekF
[docs]class repeat(BaseCli): # repeat
[docs] def __init__(self, limit:int=None): # repeat """Yields a specified amount of the passed in object. If you intend to pass in an iterator, then make a list out of it first, as second copy of iterator probably won't work as you will have used it the first time. Example:: # returns [[1, 2, 3], [1, 2, 3], [1, 2, 3]] [1, 2, 3] | repeat(3) | deref() If ``limit`` is not specified, then this will repeat indefinitely, which may or may not cause a problem downstream. It could be that some downstream operators want to get the full view of all elements in the input iterator, but because it's an infinite iterator, it will never complete executing and get the full view that it wants:: [1, 2] | repeat() | deref() | headOut() # will hang indefinitely, fill RAM up and crash after a while [1, 2] | (repeat() | deref()) | headOut() # will not hang, and actually print out the first 10 lines, each line is "[1, 2]" The solution is to just make a group that starts with ``repeat()``. Then ``repeat()`` can capture all downstream operations, and actually batch up the infinite iterator, pass each batch into the downstream operations, and merge the results together. The batch size can be controlled by ``settings.cli.repeat.infBs = 200`` if you need it for some reason. See also: :meth:`repeatF` :param repeat: if None, then repeats indefinitely""" # repeat super().__init__(capture=True); self.limit = limit # repeat
def _typehint(self, inp): return tIter(inp) # repeat def _all_array_opt(self, it, level): # repeat if self.limit is None or self.limit == float("inf"): return NotImplemented # repeat sh = it | cli.shape(); s = slice(None, None, None) # repeat it = it[(*[s]*level, None)]; newSh = [*sh[:level], self.limit, *sh[level:]] # repeat if isinstance(it, np.ndarray): return np.broadcast_to(it, newSh) | self.capturedSerial.all(level) # repeat elif hasTorch and isinstance(it, torch.Tensor): return it.expand(newSh) | self.capturedSerial.all(level) # repeat return NotImplemented # repeat
[docs] def __ror__(self, o:Any) -> Iterator[Any]: # repeat limit = self.limit if self.limit != None else k1lib.settings.cli.inf; ser = self.capturedSerial # repeat def gen(): # repeat for i in itertools.count(): # repeat if i >= limit: break # repeat yield o # repeat if limit < float("inf"): return self._all_array_opt(o, 0) if isinstance(o, settings.arrayTypes) else (gen() | ser) # repeat return (gen() | ser) if len(self.capturedClis) == 0 else gen() | cli.batched(settings.repeat.infBs) | cli.apply(ser) | cli.joinStreams() # repeat
def _jsF(self, meta): # repeat fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # repeat if self.limit is None: raise Exception("repeat._jsF() can't repeat the input array infinitely many times, as there's no iterator concept in JS") # repeat return f"{fIdx} = ({dataIdx}) => {dataIdx}.repeat({cli.kjs.v(self.limit)})", fIdx # repeat
class _repeatF(BaseCli): # _repeatF def __init__(self, limit, kwargs): super().__init__(capture=True); self.limit = limit; self.kwargs = kwargs # _repeatF def __ror__(self, f): # _repeatF f = fastF(f); limit = self.limit; kwargs = self.kwargs; ser = self.capturedSerial # _repeatF limit = limit if limit != None else k1lib.settings.cli.inf # _repeatF def gen(): # _repeatF if len(kwargs) == 0: # _repeatF for i in itertools.count(): # _repeatF if i >= limit: break # _repeatF yield f() # _repeatF else: # _repeatF for i in itertools.count(): # _repeatF if i >= limit: break # _repeatF yield f(**kwargs) # _repeatF if limit < float("inf"): return gen() | ser # _repeatF return (gen() | ser) if len(self.capturedClis) == 0 else gen() | cli.batched(settings.repeat.infBs) | cli.apply(ser) | cli.joinStreams() # _repeatF
[docs]def repeatF(f=None, limit:int=None, **kwargs): # repeatF """Yields a specified amount generated by a specified function. Example:: repeatF(lambda: 4, 3) | deref() # returns [4, 4, 4] (lambda: 4) | repeatF(limit=3) | deref() # returns [4, 4, 4] repeatF(lambda: 4) | head() | shape(0) # returns 10 f = lambda a: a+2 repeatF(f, 3, a=6) | deref() # returns [8, 8, 8] f | repeatF(limit=3, a=6) | deref() # returns [8, 8, 8] See :class:`repeat` for a discussion on how to deal with infinite iterators. If you want to do something like that, then you have to pipe in f instead of specifying f in the params directly. :param f: function to repeat. If None, then you can pipe the function in :param limit: if None, then repeats indefinitely :param kwargs: extra keyword arguments that you can pass into the function See also: :class:`repeatFrom`""" # repeatF return _repeatF(limit, kwargs) if f is None else f | _repeatF(limit, kwargs) # repeatF
[docs]class repeatFrom(BaseCli): # repeatFrom
[docs] def __init__(self, limit:int=None): # repeatFrom """Yields from a list. If runs out of elements, then do it again for ``limit`` times. Example:: # returns [1, 2, 3, 1, 2] [1, 2, 3] | repeatFrom() | head(5) | deref() # returns [1, 2, 3, 1, 2, 3] [1, 2, 3] | repeatFrom(2) | deref() See :class:`repeat` for a discussion on how to deal with infinite iterators .. note:: For advanced users who wants to modify the resulting stream mid-way, read this section Because this reuses elements inside the input iterator, it's necessary that the input feels like a list and not an iterator. So in order to make this work:: # returns [1, 2, 3, 1, 2, 3] iter([1, 2, 3]) | repeatFrom(2) | deref() It's necessary to turn the input iterator into a list. However, sometimes you may want to update the input iterator values, so as to make things extra dynamic, like this:: l = [1, 2, 3] def g(): yield from l; yield from l def h(): for i, e in enumerate(g()): if i == 3: l.append(5) # modifies the list mid-way yield e h() | deref() # returns [1, 2, 3, 1, 2, 3, 5] But if you do this, it wouldn't work:: l = [1, 2, 3] def h(): for i, e in enumerate(iter(l) | repeatFrom(2)): if i == 3: l.append(5) yield e h() | deref() # returns [1, 2, 3, 1, 2, 3] This is because internally, :class:`repeatFrom` turns the iterator into a list, and continues yielding from that list, and thus won't use the updated values. To do it, you have to make the input feels like a list (can get length):: l = [1, 2, 3] def h(): for i, e in enumerate(l | repeatFrom(2)): # ---------------- changed section if i == 3: l.append(5) yield e h() | deref() # returns [1, 2, 3, 1, 2, 3, 5] :param limit: if None, then repeats indefinitely""" # repeatFrom super().__init__(capture=True); self.limit = limit # repeatFrom
def _typehint(self, inp): # repeatFrom i = tAny() # repeatFrom if isinstance(inp, tListIterSet): i = inp.child # repeatFrom if isinstance(inp, tArrayTypes): i = inp # repeatFrom return tIter(i) # repeatFrom def _all_array_opt(self, it, level:int): # repeatFrom if self.limit is None or self.limit == float("inf"): return NotImplemented # repeatFrom return it | (repeat(self.limit) | joinStreams()).all(level) | self.capturedSerial.all(level) # repeatFrom
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # repeatFrom limit = self.limit or k1lib.settings.cli.inf; ser = self.capturedSerial # repeatFrom def gen(it): # repeatFrom try: len(it) # repeatFrom except: it = list(it) # repeatFrom for i in itertools.count(): # repeatFrom if i >= limit: break # repeatFrom yield from it # repeatFrom if limit < float("inf"): # repeatFrom if isinstance(it, settings.arrayTypes): return self._all_array_opt(it, 0) # repeatFrom if hasPandas: # repeatFrom if isinstance(it, pd.DataFrame): return pd.DataFrame({s.name:pd.concat([s]*limit, ignore_index=True) for s in it | T()}) | ser # repeatFrom if isinstance(it, pd.Series): return pd.concat([it]*limit, ignore_index=True) # repeatFrom return gen(it) | ser # repeatFrom it = init.dfGuard(it); return (gen(it) | ser) if len(self.capturedClis) == 0 else gen(it) | cli.batched(settings.repeat.infBs) | cli.apply(ser) | cli.joinStreams() # repeatFrom
def _jsF(self, meta): # repeatFrom fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # repeatFrom if self.limit is None: raise Exception(f"repeatFrom._jsF() can't repeat the input array infinitely many times, as there's no iterator concept in JS") # repeatFrom return f"{fIdx} = ({dataIdx}) => {dataIdx}.repeatFrom({cli.kjs.v(self.limit)})", fIdx # repeatFrom
def oneHotRow(i, n): ans = [0]*n; ans[i] = 1; return ans # oneHotRow
[docs]class oneHot(BaseCli): # oneHot _groups = {} # oneHot
[docs] def __init__(self, col, n:int=0, group:str=None, sep:bool=False): # oneHot """One-hot encode some column in a table. Example:: a = [ [1, 2, "A"], [3, 4, "B"], [5, 6, "C"]] b = [ [7, 8, "A"], [9, 10, "B"], [11, 12, "B"]] [*a, *b] | oneHot(2) | deref() [*a, *b] | oneHot(2, 3, "abcd") | deref() Last 2 statements both return this:: [[1, 2, 1, 0, 0], [3, 4, 0, 1, 0], [5, 6, 0, 0, 1], [7, 8, 1, 0, 0], [9, 10, 0, 1, 0], [11, 12, 0, 1, 0]] You can also separate the encoded column out like this:: [*a, *b] | oneHot(2, sep=True) | deref() Which returns this:: [[1, 2, [1, 0, 0]], [3, 4, [0, 1, 0]], [5, 6, [0, 0, 1]], [7, 8, [1, 0, 0]], [9, 10, [0, 1, 0]], [11, 12, [0, 1, 0]]] The natural way to do this is to use with without ``n`` and ``group`` parameters. But sometimes, your one hot encoding is spreaded across multiple datasets in multiple dataloaders, and so the order and length of the encoding might not be the same, which will mess up your training process. That's why, you can specify ``group``, which will share encoding information across all :class:`oneHot` clis that have the same group name. If you choose to do this then you have to also specify what's the size of the encoding, because the cli can't really infer the size when it potentially has not seen all the data right? :param col: which column one hot encode and expand into :param n: (optional) total number of different elements :param group: (optional) group name :param sep: (optional) whether to separate the variable out into its own list""" # oneHot self.col = col; self.n = n; self.group = group; self.sep = sep # oneHot if (n != 0 and group is not None) and (n == 0 or group is None): # oneHot raise Exception("You have to specify both `n` and `group` at the same time if you want to use them") # oneHot if group is not None: # oneHot if group not in oneHot._groups: # oneHot oneHot._groups[group] = dict() # oneHot self.d = oneHot._groups[group] # oneHot else: self.d = dict() # oneHot
def _typehint(self, inp): # oneHot # TODO # oneHot if isinstance(inp, tListIterSet): # oneHot if isinstance(inp.child, tListIterSet): # oneHot pass # oneHot elif isinstance(inp.child, tListIterSet): # oneHot pass # oneHot pass # oneHot return tIter(tAny()) # oneHot
[docs] def __ror__(self, it): # oneHot c = self.col; d = self.d; n = self.n; sep = self.sep; it = init.dfGuard(it) # oneHot if n == 0: # oneHot it = it | cli.deref(2); n = it | cli.cut(c) | cli.aS(set) | cli.shape(0) # oneHot for row in it: # oneHot e = row[c] # oneHot try: e[0]; len(e) # oneHot except: e = list(e) # oneHot if e not in d: d[e] = oneHotRow(len(d), n) # oneHot if sep: yield [*row[:c], d[e], *row[c+1:]] # oneHot else: yield [*row[:c], *d[e], *row[c+1:]] # oneHot return # oneHot _d = it | cli.cut(c) | cli.aS(set) | cli.sort(None, False) | cli.deref(); n = len(_d) # oneHot _d = _d | insertIdColumn(begin=False) | cli.apply(cli.aS(oneHotRow, n), 1) | transpose() | cli.toDict() # oneHot for row in it: yield [*row[:c], *_d[row[c]], *row[c+1:]] # oneHot
[docs]class latch(BaseCli): # latch
[docs] def __init__(self, timeline, before=True, startTok=None, endTok=None): # latch """Latches 1 timeline (B) onto another timeline (A). Example:: a = [[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"], [6, "f"], [7, "g"], [8, "h"], [9, "i"]] b = [[2.1, "2.1"], [3.1, "3.1"]] c = [[4.5, "4.5"], [7.3, "7.3"]] a[:5] | latch(b) | deref() # returns [[1, 'a', None], [2, 'b', None], [3, 'c', '2.1'], [4, 'd', '3.1'], [5, 'e', '3.1']] a[:5] | latch(b, False) | deref() # returns [[1, 'a', '2.1'], [2, 'b', '2.1'], [3, 'c', '3.1'], [4, 'd', None], [5, 'e', None]] b | latch(a) | deref() # returns [[2.1, '2.1', 'b'], [3.1, '3.1', 'c']] b | latch(a, False) | deref() # returns [[2.1, '2.1', 'c'], [3.1, '3.1', 'd']] # latching timelines that don't overlap b | latch(c) | deref() # returns [[2.1, '2.1', None], [3.1, '3.1', None]] b | latch(c, False) | deref() # returns [[2.1, '2.1', '4.5'], [3.1, '3.1', '4.5']] c | latch(b) | deref() # returns [[4.5, '4.5', '3.1'], [7.3, '7.3', '3.1']] c | latch(b, False) | deref() # returns [[4.5, '4.5', None], [7.3, '7.3', None]] # multiple latches also work fine a | latch(b) | latch(c) | deref() Final command returns this:: [[1, 'a', None, None], [2, 'b', None, None], [3, 'c', '2.1', None], [4, 'd', '3.1', None], [5, 'e', '3.1', '4.5'], [6, 'f', '3.1', '4.5'], [7, 'g', '3.1', '4.5'], [8, 'h', '3.1', '7.3'], [9, 'i', '3.1', '7.3']] So essentially, there're 2 timelines, "a" and "b". "a" is usually more densely packed than "b". The goal is to merge these 2 timelines together, trying to insert b's data points at various points inside a. Try to figure out the pattern and behavior from the example. This cli makes several assumptions: - All timelines inside the argument (if ``a | latch(b)``, then I'm talking about b) have only 2 columns: (time, data) - Timelines piped into (if ``a | latch(b)``, then I'm talking about a) can have multiple columns. First column has to be time - All timelines are sorted by time already, to avoid resorting See also: :class:`~k1lib.cli.utils.syncStepper` :param timeline: sparser timeline :param before: whether to use the timeline B's point before or after timeline A's point :param startTok: token to use for A's samples where B has not gone into the picture yet :param endTok: token to use for A's samples where B has yielded all elements""" # latch self.timeline = timeline; self.before = before; self.startTok = startTok; self.endTok = endTok # latch
[docs] def __ror__(self, it): # latch timeline = [[[-float("inf"), self.startTok]], self.timeline, [[float("inf"), self.endTok]]] | cli.joinSt() # latch before = self.before; lastT, lastT_desc = next(timeline); t, t_desc = next(timeline) # latch for row in init.dfGuard(it): # latch while row[0] > t: # latch lastT = t; lastT_desc = t_desc # latch t, t_desc = next(timeline) # latch yield [*row, lastT_desc if before else t_desc] # latch