Source code for k1lib.cli.filt

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for functions that cuts out specific parts of the table
"""
from typing import Callable, Union, List, overload, Iterator, Any, Set, Tuple
from k1lib.cli.init import BaseCli, fastF; import k1lib.cli.init as init
import k1lib.cli as cli; import k1lib, json, os, math, traceback
from k1lib.cli.typehint import *
import numpy as np; from collections import deque
try: import torch; hasTorch = True
except: hasTorch = False
try: import pandas as pd; pd.core; hasPandas = True
except: hasPandas = False
__all__ = ["filt", "filter_", "inSet", "contains", "empty",
           "isNumeric", "instanceOf",
           "head", "tail", "cut", "rows",
           "intersection", "union", "unique", "breakIf", "mask", "tryout", "resume",
           "trigger", "filtStd"]
settings = k1lib.settings.cli
[docs]class filt(BaseCli): # filt
[docs] def __init__(self, predicate:Callable[[Any], bool], column:Union[int, List[int]]=None, catchErrors:bool=False): # filt """Filters out elements. Examples:: # returns [2, 6], grabbing all the even elements [2, 3, 5, 6] | filt(lambda x: x%2 == 0) | deref() # returns [3, 5], grabbing all the odd elements [2, 3, 5, 6] | ~filt(lambda x: x%2 == 0) | deref() # returns [[2, 'a'], [6, 'c']], grabbing all the even elements in the 1st column [[2, "a"], [3, "b"], [5, "a"], [6, "c"]] | filt(lambda x: x%2 == 0, 0) | deref() # throws error, because strings can't mod divide [1, 2, "b", 8] | filt(lambda x: x % 2 == 0) | deref() # returns [2, 8] [1, 2, "b", 8] | filt(lambda x: x % 2 == 0, catchErrors=True) | deref() You can also pass in :class:`~k1lib.cli.modifier.op` or string, for extra intuitiveness and quickness:: # returns [2, 6] [2, 3, 5, 6] | filt(op() % 2 == 0) | deref() # returns ['abc', 'a12'] ["abc", "def", "a12"] | filt(op().startswith("a")) | deref() # returns [3, 4, 5, 6, 7, 8, 9] range(100) | filt(3 <= op() < 10) | deref() # returns [3, 4, 5, 6, 7, 8, 9] range(100) | filt("3 <= x < 10") | deref() See :class:`~k1lib.cli.modifier.aS` for more details on string mode. If you pass in :class:`numpy.ndarray` or :class:`torch.Tensor`, then it will automatically use the C-accelerated versions if possible, like this:: # returns np.array([2, 3, 4]), instead of iter([2, 3, 4]) np.array([1, 2, 3, 4]) | filt(lambda x: x>=2) | deref() # returns [2, 3, 4], instead of np.array([2, 3, 4]), because `math.exp` can't operate on numpy arrays np.array([1, 2, 3, 4]) | filt(lambda x: math.exp(x) >= 3) | deref() If you need more extensive filtering capabilities involving text, check out :class:`~k1lib.cli.grep.grep` If "filt" is too hard to remember, this cli also has an alias :class:`filter_` that kinda mimics Python's ``filter()``. :param predicate: function that returns True or False :param column: if not specified, then filters elements of the input array, else filters the specific column only (or columns, just like in :class:`~k1lib.cli.modifier.apply`) :param catchErrors: whether to catch errors in the function or not (reject elements that raise errors). Runs slower if enabled though""" # filt fs = [predicate]; super().__init__(fs); self.inverted = False; self.preInvFilt = None # filt if column: # filt ex = Exception(f"Filtering using a function on a negative-indexed column ({column}) is not supported") # filt if isinstance(column, int): # filt if column < 0: raise ex # filt else: # filt column = list(column) # filt if len([c for c in column if c < 0]): raise ex # filt self.f = f = fs[0]; _fP = fastF(f); self.column = column; self._fPArr = None; self.pArr = None # filt if catchErrors: # filt def g(x): # filt try: return _fP(x) # filt except: return False # filt self.predicate = g # filt else: self.predicate = _fP # filt
def _injectPArr(self, pArr): # filt if pArr is not None: self.pArr = pArr; self._fPArr = fastF(pArr) # filt return self # explicit vectorized predicate, can be injected by downstream clis # filt
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # filt p = self.predicate; c = self.column; fusedPArr = self._fPArr or p # filt isPd = hasPandas and isinstance(it, pd.core.arraylike.OpsMixin) # filt if c is None: # filt if isinstance(it, settings.arrayTypes) or isPd: # filt a = (it | cli.apply(p)) if self._fPArr is None else self._fPArr(it) # filt try: return it[a] # filt except: pass # filt return (l for l in it if p(l)) # filt elif isinstance(c, int): # filt if isinstance(it, settings.arrayTypes): # filt try: return it[fusedPArr(it[:,c])] # filt except: pass # filt if isPd: # filt try: return it[fusedPArr(it[list(it)[c]])] # filt except: pass # filt def gen(): # filt for es in init.dfGuard(it): # filt es = list(es) # filt if c < len(es) and p(es[c]): yield es # filt return gen() # filt else: # list of ints # filt ops = [] # filt for c_ in c: ops.append(filt(self.predicate, c_, False)._injectPArr(self.pArr)) # filt return it | cli.serial(*ops) # filt
[docs] def __invert__(self): # filt """Negate the condition""" # filt if self.inverted: raise Exception("Can't invert filt() twice!") # filt def f(s): # filt if isinstance(s, settings.arrayTypes): # filt res = self.predicate(s) # can cause an exception, but that's ok, as that's the signal telling the code in __ror__ to not pass in array types # filt if isinstance(res, settings.arrayTypes): return ~res # filt return not self.predicate(s) # filt ans = filt(f, self.column); ans.preInvFilt = self; ans.inverted = True; return ans # filt
def __neg__(self): # filt """Also negates the condition""" # filt return ~self # filt
[docs] def split(self): # filt """Splits the input into positive and negative samples. Example:: # returns [[0, 2, 4, 6, 8], [1, 3, 5, 7, 9]] range(10) | filt(lambda x: x%2 == 0).split() | deref() # also returns [[0, 2, 4, 6, 8], [1, 3, 5, 7, 9]], exactly like above range(10) | filt(lambda x: x%2 == 0) & filt(lambda x: x%2 != 0) | deref()""" # filt f = self.predicate; c = self.column; return filt(f, c) & ~filt(f, c) # filt
def _jsF(self, meta): # filt fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); argIdx = init._jsDAuto(); inverted = False # filt if self.inverted: self = self.preInvFilt; inverted = True # filt header, _fIdx, _async = k1lib.kast.asyncGuard(k1lib.kast.prepareFunc3(self.f, ("filt", meta))) # filt return f"{header}\n{fIdx} = {'async ' if _async else ''}({dataIdx}) => {'await ' if _async else ''}{dataIdx}.filt{'_async' if _async else ''}({'async ' if _async else ''}({argIdx}) => {'!' if inverted else ''}({'await ' if _async else ''}{_fIdx}({argIdx})), {cli.kjs.v(self.column)})", fIdx # filt
filter_ = filt # filt
[docs]class inSet(filt): # inSet
[docs] def __init__(self, values:Set[Any], column:int=None, inverse=False): # inSet """Filters out lines that is not in the specified set. Example:: # returns [2, 3] range(5) | inSet([2, 8, 3]) | deref() # returns [0, 1, 4] range(5) | ~inSet([2, 8, 3]) | deref()""" # inSet if not isinstance(values, (set, dict)): values = set(values) # inSet super().__init__((lambda l: l in values) if not inverse else (lambda l: not l in values), column) # inSet self.values = values; self.column = column; self.inverse = inverse # inSet
[docs] def __invert__(self): return inSet(self.values, self.column, not self.inverse) # inSet
def _jsF(self, meta): # inSet fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); setIdx = init._jsDAuto() # inSet return f"{setIdx} = {json.dumps(list(self.values))};\n{fIdx} = ({dataIdx}) => {dataIdx}.inSet({setIdx}, {cli.kjs.v(self.column)}, {cli.kjs.v(self.inverse)})", fIdx # inSet
[docs]class contains(filt): # contains
[docs] def __init__(self, s:str, column:int=None, inverse=False): # contains """Filters out lines that don't contain the specified substring. Sort of similar to :class:`~k1lib.cli.grep.grep`, but this is simpler, and can be inverted. Example:: # returns ['abcd', '2bcr'] ["abcd", "0123", "2bcr"] | contains("bc") | deref()""" # contains super().__init__((lambda e: s in e) if not inverse else (lambda e: not s in e), column) # contains self.s = s; self.column = column; self.inverse = inverse # contains
[docs] def __invert__(self): return contains(self.s, self.column, not self.inverse) # contains
def _jsF(self, meta): # contains fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); sIdx = init._jsDAuto() # contains return f"{sIdx} = {json.dumps(self.s)};\n{fIdx} = ({dataIdx}) => {dataIdx}.contains({sIdx}, {cli.kjs.v(self.column)}, {cli.kjs.v(self.inverse)})", fIdx # contains
[docs]class empty(BaseCli): # empty
[docs] def __init__(self, reverse=False): # empty """Filters out streams that is not empty. Almost always used inverted, but "empty" is a short, sweet name that's easy to remember. Example:: # returns [[1, 2], ['a']] [[], [1, 2], [], ["a"]] | ~empty() | deref() :param reverse: not intended to be used by the end user. Do ``~empty()`` instead.""" # empty super().__init__(); self.reverse = reverse # empty
[docs] def __ror__(self, streams:Iterator[Iterator[Any]]) -> Iterator[Iterator[Any]]: # empty r = self.reverse # empty for stream in streams: # empty try: # empty item, it = stream | cli.peek() # empty if not r: # empty if it == []: yield it # empty else: # empty if it != []: yield it # empty except StopIteration: pass # empty
[docs] def __invert__(self): # empty return empty(not self.reverse) # empty
[docs]def isNumeric(column:int=None) -> filt: # isNumeric """Filters out a line if that column is not a number. Example:: # returns [0, 2, '3'] [0, 2, "3", "a"] | isNumeric() | deref()""" # isNumeric def f(v): # isNumeric try: float(v); return True # isNumeric except ValueError: return False # isNumeric return filt(f, column) # isNumeric
[docs]def instanceOf(cls:Union[type, Tuple[type]], column:int=None) -> filt: # instanceOf """Filters out lines that is not an instance of the given type. Example:: # returns [2] [2, 2.3, "a"] | instanceOf(int) | deref() # returns [2, 2.3] [2, 2.3, "a"] | instanceOf((int, float)) | deref()""" # instanceOf if isinstance(cls, list): cls = tuple(cls) # instanceOf return filt(lambda e: isinstance(e, cls), column) # instanceOf
def sliceable(it): # sliceable try: it[:]; len(it); return True # sliceable except: return False # sliceable def _head(n, inverted, it): # _head it = iter(it) # _head if n is None: # _head if not inverted: yield from it # _head else: return # _head elif n >= 0: # _head if not inverted: # _head for i, line in zip(range(n), it): yield line # _head else: # _head for i, line in zip(range(n), it): pass # _head yield from it # _head else: # _head if not inverted: # head to -3 # _head n = abs(n); queue = deque() # _head for line in it: # _head queue.append(line) # _head if len(queue) > n: yield queue.popleft() # _head else: yield from deque(it, -n) # -3 to end # _head class headSplit(BaseCli): # headSplit def __init__(self, n, inverted): # headSplit self.n = n; self.inverted = inverted # headSplit self.fixup = n is None or isinstance(n, float) or n < 0 # headSplit self.sliceable = None # headSplit def _all_array_opt(self, it, level): # headSplit n = self.n; inverted = self.inverted # headSplit if n is not None and round(n) != n: n = int(it.shape[level]*n) # fractional head # headSplit sl = tuple([slice(None)]*level); b = it[(*sl, slice(n, None))]; a = it[(*sl, slice(None, n))] # headSplit return (b, a) if inverted else (a, b) # headSplit def __ror__(self, it): # headSplit sliceable_ = self.sliceable; n = self.n; inverted = self.inverted # headSplit if sliceable_ is None: self.sliceable = sliceable_ = sliceable(it) # headSplit it = it if sliceable_ else list(it) # headSplit if self.fixup: # needs to fix n to a more definite value. Just to make it faster # headSplit l = len(it) # headSplit if n is None: return it, [] # headSplit if isinstance(n, float): n = int(l*n) # fractional head # headSplit n = (n+l)%l # headSplit return (it[n:], it[:n]) if inverted else (it[:n], it[n:]) # headSplit
[docs]class tail(BaseCli): # tail
[docs] def __init__(self, n:int=10): # tail """Basically an inverted :class:`head`. Examples:: range(10) | tail(3) | deref() # returns [7, 8, 9]""" # tail self.n = n # tail
[docs] def __ror__(self, it): return it | ~head(-self.n) # tail
def _jsF(self, meta): # tail fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); argIdx = init._jsDAuto() # tail return f"{fIdx} = ({dataIdx}) => {dataIdx}.head(-({self.n}), true)", fIdx # tail
class lazyList: # lazyList def __init__(self, it): # lazyList self.it = iter(it); self.elems = [] # lazyList def __getitem__(self, idx): # lazyList elems = self.elems; it = self.it # lazyList for _ in range(len(elems)-1, idx): elems.append(next(it)) # lazyList return elems[idx] # lazyList
[docs]class rows(BaseCli): # rows
[docs] def __init__(self, *rows:List[int]): # rows """Selects specific elements given an iterator of indexes. Space complexity O(1) as a list is not constructed (unless you're slicing it in really weird way). Example:: "0123456789" | rows(2) | toList() # returns ["2"] "0123456789" | rows(5, 8) | toList() # returns ["5", "8"] "0123456789" | rows()[2:5] | toList() # returns ["2", "3", "4"] "0123456789" | ~rows()[2:5] | toList() # returns ["0", "1", "5", "6", "7", "8", "9"] "0123456789" | ~rows()[:7:2] | toList() # returns ['1', '3', '5', '7', '8', '9'] "0123456789" | rows()[:-4] | toList() # returns ['0', '1', '2', '3', '4', '5'] "0123456789" | ~rows()[:-4] | toList() # returns ['6', '7', '8', '9'] Why it's called "rows" is because I couldn't find a good name for it. There was :class:`cut`, which the name of an actual bash cli that selects out columns given indicies. When I needed a way to do what this cli does, it was in the context of selecting out rows, so the name stuck. If you want to just pick out the nth item from the iterator, instead of doing this:: iter(range(10)) | rows(3) | item() # returns 3 ... you can use the shorthand :class:`~k1lib.cli.utils.rItem` instead:: iter(range(10)) | rItem(3) # returns 3 :param rows: ints for the row indices""" # rows if len(rows) == 1 and isinstance(rows[0], slice): self.slice = rows[0]; self.rows = None; self.idxMode = False # rows else: self.slice = None; self.rows = rows; self.sortedRows = sorted(rows); self.idxMode = True # rows self.inverted = False # rows
def __getitem__(self, _slice): # rows start, stop, step = _slice.start, _slice.stop, _slice.step # rows if step == None or step == 1: # rows if start == None and stop == None: return cli.iden() # rows if start == None: return head(stop) # rows if stop == None: return ~head(start) # rows elif step == 0: return cli.ignore() # rows answer = rows(_slice); answer.inverted = self.inverted; return answer # rows def _all_array_opt(self, it, level:int): # rows a = np.array(self.rows) if self.rows else self.slice; s = [slice(None, None, None)]*level # rows if self.inverted: mask = np.ones(it.shape[level], dtype=bool); mask[a] = False; return it[(*s, mask)] # rows return it[(*s, a)] # rows
[docs] def __invert__(self): self.inverted = not self.inverted; return self # rows
[docs] def __ror__(self, it:Iterator[str]): # rows idxMode = self.idxMode; inverted = self.inverted; sl = self.slice; rw = self.rows # rows if hasPandas and isinstance(it, pd.core.frame.DataFrame): # rows if sl is not None: return it.iloc[sl] if not inverted else it.iloc[np.array(list(set(range(len(it))) - set(range(len(it))[sl])))] # rows else: return it.iloc[list(rw)] if not inverted else it.iloc[np.array(list(set(range(len(it))) - set(rw)))] # rows def gen(it): # rows if not inverted: # rows if idxMode: # rows if len(self.sortedRows) == 0: return # rows it = list(it) if self.sortedRows[0] < 0 else lazyList(it) # rows for idx in rw: yield it[idx] # rows else: yield from list(it)[sl] # rows else: # rows it = list(it); n = len(it); idxs = set((e if e >= 0 else n+e) for e in rw) if idxMode else set(range(n)[sl]) # rows yield from (e for i, e in enumerate(it) if i not in idxs) # rows if isinstance(it, settings.arrayTypes): # rows a = np.array(rw) if rw else sl # rows if inverted: mask = np.ones(len(it), dtype=bool); mask[a] = False; return it[mask] # rows else: return it[a] # rows return gen(it) # rows
def _jsF(self, meta): # rows fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # rows if not self.slice is None: raise Exception("rows._jsF() doesn't support slice-based indexing yet") # rows if self.inverted: raise Exception("rows._jsF() doesn't support inversion yet") # rows return f"{fIdx} = ({dataIdx}) => {dataIdx}.rows({cli.kjs.vs(self.rows) | cli.join(', ')})", fIdx # rows
[docs]class cut(BaseCli): # cut
[docs] def __init__(self, *columns:List[int]): # cut """Cuts out specific columns, sliceable. Examples:: ["0123456789", "abcdefghij"] | cut(5, 8) | deref() # returns [['5', '8'], ['f', 'i']] ["0123456789", "abcdefghij"] | cut(8, 5) | deref() # returns [['8', '5'], ['i', 'f']], demonstrating permutation-safe ["0123456789"] | cut(5, 8) | deref() # returns [['5', '8']] ["0123456789"] | cut(8, 5) | deref() # returns [['8', '5']], demonstrating permutation-safe ["0123456789", "abcdefghij"] | cut(2) | deref() # returns ['2', 'c'], instead of [['2'], ['c']] as usual ["0123456789"] | cut(2) | deref() # returns ['2'] ["0123456789"] | cut(5, 8) | deref() # returns [['5', '8']] ["0123456789"] | ~cut()[:7:2] | deref() # returns [['1', '3', '5', '7', '8', '9']] In the first example, you can imagine that we're operating on this table:: 0123456789 abcdefghij Then, we want to grab the 5th and 8th column (0-indexed), which forms this table:: 58 fi So, result of that is just ``[['5', '8'], ['f', 'i']]`` In the fourth example, if you're only cutting out 1 column, then it will just grab that column directly, instead of putting it in a list. If you pass in :class:`numpy.ndarray` or :class:`torch.Tensor`, then it will automatically use the C-accelerated versions, like this:: torch.randn(4, 5, 6) | cut(2, 3) # returns tensor of shape (4, 2, 6) torch.randn(4, 5, 6) | cut(2) # returns tensor of shape (4, 6) torch.randn(4, 5, 6) | ~cut()[2:] # returns tensor of shape (4, 2, 6) .. warning:: TD;DR: inverted negative indexes are a bad thing when rows don't have the same number of elements Everything works fine when all of your rows have the same number of elements. But things might behave a little strangely if they don't. For example:: # returns [['2', '3', '4'], ['2', '3', '4', '5', '6', '7']]. Different number of columns, works just fine ["0123456", "0123456789"] | cut()[2:-2] | deref() # returns [['0', '1', '8', '9'], ['a', 'b', 'i', 'j']]. Same number of columns, works just fine ["0123456789", "abcdefghij"] | ~cut()[2:-2] | deref() # returns [['0', '1', '5', '6'], ['0', '1', '5', '6', '7', '8', '9']]. Different number of columns, unsupported invert case ["0123456", "0123456789"] | ~cut()[2:-2] | deref() Why does this happen? It peeks at the first row, determines that ~[2:-2] is equivalent to [:2] and [5:] combined and not [:2] and [-2:] combined. When applied to the second row, [-2:] goes from 5->9, hence the result. Another edge case would be:: # returns [['0', '1', '2', '3', '5', '6'], ['0', '1', '2', '3', '5', '6', '7', '8', '9']] ["0123456", "0123456789"] | ~cut(-3) | deref() Like before, it peeks the first row and translate ~(-3) into ~4, which is equivalent to [:4] and [5:]. But when applied to the second row, it now carries the meaning ~4, instead of ~(-3). Why don't I just fix these edge cases? Because the run time for it would be completely unacceptable, as we'd have to figure out what's the columns to include in the result for every row. This could easily be O(n^3). Of course, with more time optimizing, this could be solved, but this is the only extreme edge case and I don't feel like putting in the effort to optimize it.""" # cut super().__init__() # cut if len(columns) == 1 and isinstance(columns[0], slice): columns = columns[0] # columns is either a slice object, or a list of ints # cut self.columns = columns; self.inverted = False # columns: list[int] | slice # cut
def _all_array_opt(self, it, level): # cut c = self.columns; r = rows(c) if isinstance(c, slice) else rows(*c) # cut if self.inverted: r = ~r # cut it = it | r.all(level+1); return (it | cli.item().all(level+1)) if not isinstance(c, slice) and len(c) == 1 else it # cut
[docs] def __ror__(self, it): # cut columns = self.columns; inverted = self.inverted # cut if hasPandas and isinstance(it, pd.core.frame.DataFrame): # cut itt = (it | cli.T()); n = len(itt); colNames = list(it) # cut if isinstance(columns, slice): # cut columns = list(sorted(set(range(n)) - set(range(n)[self.columns]))) if self.inverted else range(len(colNames))[columns] # cut return it[[colNames[x] for x in columns]] # cut else: # cut columns = list(sorted(set(range(n)) - set(self.columns))) if self.inverted else self.columns # cut return it[colNames[columns[0]]] if len(columns) == 1 and not inverted else it[[colNames[x] for x in columns]] # cut isArray = isinstance(it, settings.arrayTypes)#; isArray = False # cut if isArray: nCols = len(it[0]); prs = rs = range(nCols) # range(nColumns). "prs" for padded rs # cut else: # carefully peaking first row and get the number of columns # cut it = iter(it); sentinel = object(); row = next(it, sentinel) # cut if row is sentinel: return [] # cut row = list(row); it = it | cli.insert(row); nCols = len(row) # cut rs = range(nCols); prs = range(nCols+20) # 20 for longer rows below. Also "rs" is not a great name, deal with it # cut if isinstance(columns, slice): # cut if not inverted: return it[:,columns] if isArray else (row[columns] for row in (list(row) for row in it)) # cut columns = list(set(rs[columns])) # cut columns = [e if e >= 0 else nCols + e for e in columns] # clear negative indicies # cut if self.inverted: columns = list(set(e for e in prs if e not in columns)) # cut if len(columns) == 1: c = columns[0]; return it[:,c] if isArray else (r[c] for r in (list(row) for row in it) if len(r) > c) # cut else: return it[:,columns] if isArray else ([row[c] for c in columns if c < len(row)] for row in (list(row) for row in it)) # cut
def __getitem__(self, idx): answer = cut(idx); answer.inverted = self.inverted; return answer # cut
[docs] def __invert__(self): self.inverted = not self.inverted; return self # cut
def _jsF(self, meta): # cut fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # cut if isinstance(self.columns, slice): raise Exception("cut._jsF() doesn't support slice-based indexing yet") # cut return f"{fIdx} = ({dataIdx}) => {dataIdx}.{'cutInv' if self.inverted else 'cut'}({cli.kjs.vs(self.columns) | cli.join(', ')})", fIdx # cut
[docs]class intersection(BaseCli): # intersection
[docs] def __init__(self, column=None, full=False): # intersection """Returns the intersection of multiple streams. Example:: # returns set([2, 4, 5]) [[1, 2, 3, 4, 5], [7, 2, 4, 6, 5]] | intersection() # returns ['2g', '4h', '5j'] [["1a", "2b", "3c", "4d", "5e"], ["7f", "2g", "4h", "6i", "5j"]] | intersection(0) | deref() If you want the full distribution, meaning the intersection, as well as what's left of each stream, you can do something like this:: # returns [{2, 4, 5}, [1, 3], [7, 6]] [[1, 2, 3, 4, 5], [7, 2, 4, 6, 5]] | intersection(full=True) | deref() :param column: what column to apply the intersection on. Defaulted to None :param full: if specified, return the full distribution, instead of the intersection alone""" # intersection super().__init__(); self.column = column # intersection self.full = full # intersection self.f = intersection(column, False) if full else None # intersection
def _typehint(self, inp): # intersection if self.column is None: # intersection if isinstance(inp, tArrayTypes): return tSet(inp.child) # intersection if isinstance(inp, tListIterSet): # intersection if isinstance(inp.child, tListIterSet): # intersection return tSet(inp.child.child) # intersection return tSet(tAny()) # intersection if isinstance(inp, tCollection): # intersection a = inp.children[0] # intersection for e in inp.children: # intersection if not isinstance(e, tListIterSet): return tSet(tAny()) # intersection if e.child != a.child: return tSet(tAny()) # intersection return tSet(a.child) # intersection return tSet(tAny()); # intersection else: return tAny() # intersection
[docs] def __ror__(self, its:Iterator[Iterator[Any]]) -> Set[Any]: # intersection c = self.column; full = self.full; f = self.f # intersection if full: # intersection if c is None: # intersection its = its | cli.deref(2); inter = its | f # intersection return [inter, *((e for e in it if e not in inter) for it in its)] # intersection else: raise Exception("intersection(int, True) mode not supported yet, as it's a little ambiguous what's the use case is, and there're many styles of functionality that this can take on") # intersection if c is None: # intersection answer = None # intersection for it in its: # intersection if answer is None: answer = set(it); continue # intersection answer = answer.intersection(it) # intersection return set() if answer is None else answer # intersection else: # intersection its = its | cli.deref(2); ans = {} # intersection ids = its | cut(c).all() | intersection() | cli.aS(set) # intersection for it in its: # intersection for row in it: # intersection e = row[c] # intersection if e in ids: ans[e] = row # intersection return ans.values() # intersection
def _jsF(self, meta): # intersection fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # intersection if not self.column is None: raise Exception("intersection._jsF() doesn't support targeting specific .column yet") # intersection if not self.full: raise Exception("intersection._jsF() doesn't support .full yet") # intersection return f"{fIdx} = ({dataIdx}) => {dataIdx}.intersection()", fIdx # intersection
[docs]class union(BaseCli): # union
[docs] def __init__(self): # union """Returns the union of multiple streams. Example:: # returns {0, 1, 2, 10, 11, 12, 13, 14} [range(3), range(10, 15)] | union() """ # union super().__init__() # union
def _typehint(self, inp): # union return intersection()._typehint(inp) # union
[docs] def __ror__(self, its:Iterator[Iterator[Any]]) -> Set[Any]: # union answer = set() # union for it in its: answer = set.union(answer, set(it)) # union return answer # union
def _jsF(self, meta): # union fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # union return f"{fIdx} = ({dataIdx}) => {dataIdx}.union()", fIdx # union
[docs]class unique(BaseCli): # unique
[docs] def __init__(self, column:int=None): # unique """Filters out non-unique row elements. Example:: # returns [[1, "a"], [2, "a"]] [[1, "a"], [2, "a"], [1, "b"]] | unique(0) | deref() # returns [0, 1, 2, 3, 4] [*range(5), *range(3)] | unique() | deref() In the first example, because the 3rd element's first column is 1, which has already appeared, so it will be filtered out. :param column: the column to detect unique elements. Can be None, which will behave like converting the input iterator into a set, but this cli will maintain the order""" # unique super().__init__(); self.column = column # unique
[docs] def __ror__(self, it): # unique c = self.column # unique if c is None: # unique if isinstance(it, settings.arrayTypes): bm = np if isinstance(it, np.ndarray) else (torch if hasTorch and isinstance(it, torch.Tensor) else None); return bm.unique(it) # unique def gen(): # unique terms = set() # unique for e in it: # unique if e not in terms: yield e # unique terms.add(e) # unique else: # unique def gen(): # unique terms = set() # unique for row in it: # unique row = list(row); e = row[c] # unique if e not in terms: yield row # unique terms.add(e) # unique return gen() # unique
def _jsF(self, meta): # unique fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # unique return f"{fIdx} = ({dataIdx}) => {dataIdx}.unique({cli.kjs.v(self.column)})", fIdx # unique
[docs]class breakIf(BaseCli): # breakIf
[docs] def __init__(self, f, col:int=None): # breakIf """Breaks the input iterator if a condition is met. Example:: # returns [0, 1, 2, 3, 4, 5] [*range(10), 2, 3] | breakIf(lambda x: x > 5) | deref() # returns [[1, 'a'], [2, 'b']] [[1, "a"], [2, "b"], [3, "c"], [2, "d"], [1, "e"]] | breakIf("x > 2", 0) | deref() :param col: column to apply the condition on""" # breakIf fs = [f]; super().__init__(fs); self.f = fs[0]; self._fC = fastF(self.f); self.col = col # breakIf
def _typehint(self, inp): # breakIf if isinstance(inp, tListIterSet): return tIter(inp.child) # breakIf return tIter(tAny()) # breakIf
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # breakIf f = self._fC; col = self.col # breakIf if hasPandas and isinstance(it, pd.core.arraylike.OpsMixin): # breakIf ndim = len(it | cli.shape()) # breakIf a = init.preprocessPd(it, col, lambda x: bool(f(x)), lambda x: f(x).astype(bool)) # breakIf return it[:a.argmax()] if a[a.argmax()] else it # breakIf def gen(): # breakIf if col is None: # breakIf for line in it: # breakIf if f(line): break # breakIf yield line # breakIf else: # breakIf for row in it: # breakIf if f(row[col]): break # breakIf yield row # breakIf return gen() # breakIf
def _jsF(self, meta): # breakIf fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); argIdx = init._jsDAuto() # breakIf header, _fIdx = k1lib.kast.prepareFunc3(self.f, ("breakIf", meta)) # breakIf return f"{header}\n{fIdx} = ({dataIdx}) => {dataIdx}.breakIf(({argIdx}) => {_fIdx}({argIdx}))", fIdx # breakIf
[docs]class mask(BaseCli): # mask
[docs] def __init__(self, mask:Iterator[bool]): # mask """Masks the input stream. Example:: # returns [0, 1, 3] range(5) | mask([True, True, False, True, False]) | deref() # returns [2, 4] range(5) | ~mask([True, True, False, True, False]) | deref() # returns torch.tensor([0, 1, 3]) torch.tensor(range(5)) | mask([True, True, False, True, False])""" # mask super().__init__(); self.mask = mask; self.inverted = False # mask
[docs] def __invert__(self): res = mask(self.mask); res.inverted = not self.inverted; return res # mask
[docs] def __ror__(self, it): # mask if self.inverted: # mask if isinstance(it, settings.arrayTypes): return it[[not e for e in self.mask]] # mask if hasPandas and isinstance(it, pd.core.arraylike.OpsMixin): return it[[not e for e in self.mask]] # mask return (e for e, m in zip(it, self.mask) if not m) # mask else: # mask if isinstance(it, settings.arrayTypes): return it[list(self.mask)] # mask if hasPandas and isinstance(it, pd.core.arraylike.OpsMixin): return it[list(self.mask)] # mask return (e for e, m in zip(it, self.mask) if m) # mask
[docs]class tryout(BaseCli): # tryout
[docs] def __init__(self, result=None, retries=0, mode="result"): # tryout """Wraps every cli operation after this in a try-catch block, returning ``result`` if the operation fails. Example:: # returns 9 3 | (tryout("failed") | op()**2) # returns "failed", instead of raising an exception "3" | (tryout("failed") | op()**2) # special mode: returns "unsupported operand type(s) for ** or pow(): 'str' and 'int'" "3" | (tryout(mode="str") | op()**2) # special mode: returns entire trace stack (do `import traceback` first) "3" | (tryout(mode="traceback") | op()**2) # special mode: returns "3", the input of the tryout() block "3" | (tryout(mode="input") | op()**2) By default, this ``tryout()`` object will gobble up all clis behind it and wrap them inside a try-catch block. This might be undesirable, so you can stop it early:: # returns "failed" 3 | (tryout("failed") | op()**2 | aS(str) | op()**2) # raises an exception, because it errors out after the tryout()-captured operations 3 | (tryout("failed") | op()**2) | aS(str) | op()**2 In the first example, :class:`tryout` will catch any errors happening within ``op()``, ``aS(str)`` or the second ``op()**2``. In the second example, :class:`tryout` will only catch errors happening within the first ``op()**2``. .. admonition:: Array mode The above works well for atomic operations and not looping operations. Let's say we have this function:: counter = 0 def f(x): global counter if x > 5: counter += 1 if counter < 3: raise Exception(f"random error: {x}") return x**2 This code will throw an error if x is greater than 5 for the first and second time (but runs smoothly after that. It's a really nasty function I know). Capturing like this will work:: counter = 0 # line below returns [0, 1, 4, 9, 16, 25, 'failed', 'failed', 64, 81] range(10) | apply(tryout("failed") | aS(f)) | deref() But capturing like this won't work:: counter = 0 # line below throws an exception range(10) | (tryout("failed") | apply(f)) | deref() The reason being, :class:`tryout` will only capture errors when the data is passed into ``apply(f)``, and won't capture it later on. However, when data is passed to ``apply(f)``, it hasn't executed anything yet (remember these things are lazily executed). So the exception actually happens when you're trying to ``deref()`` it, which lies outside of :class:`tryout`'s reach. You can just put a tilde in front to tell it to capture errors for individual elements in the iterator:: counter = 0 # line belows returns [0, 1, 4, 9, 16, 25, 'failed', 'failed', 64, 81] range(10) | (~tryout("failed") | apply(f)) | deref() This mode has a weird quirk that requires that there has to be a 1-to-1 correspondence between the input and output for the block of code that it wraps around. Meaning this is okay:: def g(x): global counter if 40 > x[0] >= 30: counter += 1 if counter < 5: raise Exception("random error") return x counter = 0 # returns 50, corrects errors as if it's not even there! range(50) | (~tryout(None, 6) | batched(10, True) | apply(g) | joinStreams()) | deref() | shape(0) This is okay because going in, there're 50 elements, and it's expected that 50 elements goes out of :class:`tryout`. The input can be of infinite length, but there has to be a 1-to-1 relationship between the input and output. While this is not okay:: counter = 0 # returns 75, data structure corrupted range(50) | (~tryout(None, 6) | batched(10, True) | apply(g) | joinStreams() | batched(2, True)) | joinStreams() | deref() | shape(0) It's not okay because it's expected that 25 pairs of elements goes out of :class:`tryout` .. admonition:: Retrying There's also the ``retries`` parameter, which specifies how many times should this class retry the operation until actually returning the predefined result:: counter = 0 # line below returns [0, 1, 4, 9, 16, 25, None, None, 64, 81] range(10) | (~tryout(retries=0) | apply(f)) | deref() counter = 0 # line below returns [0, 1, 4, 9, 16, 25, None, 49, 64, 81] range(10) | (~tryout(retries=1) | apply(f)) | deref() counter = 0 # line below returns [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] range(10) | (~tryout(retries=2) | apply(f)) | deref() :param result: result to return if there is an exception :param mode: if "result" (default), returns the result if there's an exception. If "str" then returns the exception's string. If "input" then returns the original input. If "traceback" then returns the exception's traceback :param retries: how many time to retry before giving up?""" # tryout super().__init__(capture=True); self.result = result; self.inverted = False; self.retries = retries; self.mode = mode # tryout
[docs] def __ror__(self, it): # tryout retries = self.retries; result = self.result; mode = self.mode; it = init.dfGuard(it) # tryout if len(self.capturedClis) == 0: raise Exception("tryout() currently does not wrap around any other cli. You may need to change `data | tryout() | cli1() | cli2()` into `data | (tryout() | cli1() | cli2())`") # tryout if not self.inverted: # single mode # tryout while True: # tryout try: return it | self.capturedSerial # tryout except Exception as e: # tryout if retries <= 0: return str(e) if mode == "str" else (it if mode == "input" else (traceback.format_exc() if mode == "traceback" else result)) # tryout retries -= 1 # tryout else: # array mode # tryout def gen(it): # tryout patience = retries; savedInputs = k1lib.Wrapper(deque()); ogInp = None # tryout def interceptIt(it): # tryout for e in it: savedInputs().append(e); yield e # tryout it = iter(it); ogIt = it; it = interceptIt(it); outIt = it | self.capturedSerial # tryout while True: # tryout try: e = next(outIt); yield e; savedInputs().popleft(); patience = retries # tryout except StopIteration: break # tryout except Exception as e: # tryout if patience <= 0: ogInp = savedInputs().popleft(); patience = retries # ran out of patience, so gonna just return the canned result instead # tryout else: patience -= 1 # tryout # restart the loop # tryout it = interceptIt([list(savedInputs()), ogIt] | cli.joinStreams()) # tryout savedInputs.value = deque(); outIt = it | self.capturedSerial # tryout if patience == retries: # tryout yield str(e) if mode == "str" else (ogInp if mode == "input" else (traceback.format_exc() if mode == "traceback" else result)) # just resetted # tryout return gen(it) # tryout
[docs] def __invert__(self): self.inverted = not self.inverted; return self # tryout
[docs]def resume(fn): # resume """Resumes a long-running operation. I usually have code that looks like this:: def f(x): pass # long running, expensive calculation ls(".") | applyMp(f) | apply(dill.dumps) | file("somefile.pth") # executing cat.pickle("somefile.pth") | aS(list) # getting all of the saved objects This will read all the files in the current directory, transforms them using the long-running, expensive function, potentially doing it in multiple processes. Then the results are serialized (turns into bytes) and it will be appended to an output file. What's frustrating is that I do stupid things all the time, so the process usually gets interrupted. But I don't want to redo the existing work, so that's where this cli comes into play. Now it looks like this instead:: ls(".") | resume("somefile.pth") | applyMp(f) | apply(dill.dumps) >> file("somefile.pth") Note that we're inserting a resume() AND changed the file write mode to append, so that the file doesn't get overriden. Internally, this is just a shorthand for ``~head(fn | (tryout(0) | aS(cat.pickle) | shape(0)))`` :param fn: file name""" # resume return ~cli.head(fn | (cli.tryout(0) | cli.aS(cli.cat.pickle) | cli.shape(0))) # resume
[docs]class trigger(BaseCli): # trigger
[docs] def __init__(self, col=None, before=False): # yields whenever there's a delta # trigger """Yields elements whenever the requested column changes in value. Example:: # returns [1, 2, 3, 4, 2] [1, 1, 1, 2, 2, 3, 4, 4, 2, 2] | trigger() | cli.deref() data = [[1, 0], [2, 1], [2, 2], [1, 3], [1, 4]] # returns [[1, 0], [2, 1], [1, 3]] data | trigger(0) | deref() # returns [[1, 0], [2, 2], [1, 4]], note how on a trigger boundary, this returns the previous ("before") value, not the next value data | trigger(0, True) | deref() data = [[1, 0], [2, 2], [2, 2], [1, 3], [1, 4]] # returns [[1, 0], [2, 2], [1, 3], [1, 4]] data | trigger([0, 1]) | deref() This takes inspiration from electrical engineering, where a capacitor is essentially a "trigger detector". If the input voltage changes too rapidly, the capacitor will make the output voltage spike up or down dramatically, as if it's taking the derivative of the input signal. Slightly related is the Schmitt trigger. Anyway, this cli will track whether a column changes in value, and if it does, yields the change, else ignore it. This is useful when processing time series data, to discard adjacent messages that are effectively identical and carries no extra information. The parameter .col can be one of these 3 types: - None (default): This considers the whole element/row for value change signal - int: This considers a single column for value change signal - list[int]: This considers a tuple of the selected columns for value change signal Related, but not precisely the same operation include :class:`~k1lib.cli.filt.unique` and ``aS(set)`` :param col: column to trigger :param before: whether to take the previous or the next value when it changes. Defaulted to next value (False)""" # trigger self.col = col; self.before = before # trigger
[docs] def __ror__(self, it): # trigger it = init.dfGuard(it); empty = object() # random sentinel # trigger col = self.col; before = self.before; lastValue = empty; lastRow = empty # trigger if col is not None: # trigger if isinstance(col, int): # single column # trigger if before: # trigger for row in it: # trigger row = list(row); value = row[col] # trigger if value != lastValue: # trigger if lastRow is not empty: yield lastRow # trigger lastRow = row; lastValue = value # trigger yield row # trigger else: # trigger for row in it: # trigger row = list(row); value = row[col] # trigger if value != lastValue: yield row; lastRow = row; lastValue = value # trigger else: # list[int], multiple columns considered 1 # trigger if before: # trigger for row in it: # trigger row = list(row); value = tuple(row[i] for i in col) # trigger if value != lastValue: # trigger if lastRow is not empty: yield lastRow # trigger lastRow = row; lastValue = value # trigger yield row # trigger else: # trigger for row in it: # trigger row = list(row); value = tuple(row[i] for i in col) # trigger if value != lastValue: yield row; lastRow = row; lastValue = value # trigger else: # tales the whole thing in # trigger if before: # trigger for value in it: # trigger if value != lastValue: # trigger if lastValue is not empty: yield lastValue # trigger lastValue = value # trigger yield value # trigger else: # trigger for value in it: # trigger if value != lastValue: yield value; lastValue = value # trigger
[docs]class filtStd(BaseCli): # filtStd
[docs] def __init__(self, col:int=None, std:float=2, N:int=1): # filtStd """Filters out values that is outside the specified standard deviation. Example:: data = [*np.random.randn(100), *np.random.randn(10)*10] | randomize(None) | deref() data | filtStd(std=2) | shape(0) # likely returns around 104 data | filtStd(std=0.1) | shape(0) # likely returns around 22 # column mode data | apply(lambda x: ["a", x]) | filtStd(1, std=2) | shape(0) # likely returns around 104 # inverse mode. Will only take values that are outside the std range data | ~filtStd(std=0.1) | shape(0) # likely returns around 88 :param col: column to extract the value out of :param std: how many standard deviations above and below to accept the values :param N: how many times to do this operation. ``filtStd(std=1, N=2)`` is equivalent to ``filtStd(std=1) | filtStd(std=1)``""" # filtStd self.col = col; self.std = std; self.N = N; self.inverted = False # filtStd
[docs] def __invert__(self): res = filtStd(self.col, self.std); res.inverted = not self.inverted; return res # filtStd
[docs] def __ror__(self, it): # filtStd if self.N != 1: # filtStd f = filtStd(self.col, self.std, 1); f.inverted = self.inverted # filtStd for i in range(self.N): it = it | f # filtStd return it # filtStd col = self.col; inv = self.inverted; fStd = self.std # filter std # filtStd isPd = hasPandas and isinstance(it, pd.core.arraylike.OpsMixin) # filtStd if col is None: # filtStd if isinstance(it, k1lib.settings.cli.arrayTypes) or isPd: # filtStd mean = it.mean(); std = it.std() # filtStd minV = mean - std*fStd; maxV = mean + std*fStd # filtStd if not inv: return it[(it >= minV) * (it <= maxV)] # filtStd else: return it[(it < minV) + (it > maxV)] # filtStd it = list(it); mean = it | cli.toMean(); std = it | cli.toStd() # filtStd minV = mean - std*fStd; maxV = mean + std*fStd # filtStd if not inv: return [v for v in it if minV <= v <= maxV] # filtStd else: return [v for v in it if v < minV or v > maxV] # filtStd else: # filtStd if isinstance(it, k1lib.settings.cli.arrayTypes) or isPd: # filtStd c = it[list(it)[col]] if isPd else it[:,col] # filtStd mean = c.mean(); std = c.std() # filtStd minV = mean - std*fStd; maxV = mean + std*fStd # filtStd x = c if isPd else (c | cli.toMean().all()) # filtStd if not inv: return it[(minV <= x) * (x <= maxV)] # filtStd else: return it[(minV > x) + (x > maxV)] # filtStd row, it = it | cli.peek() # filtStd if it == []: return [] # filtStd try: # sliceable? This optimization cause the input might have sliceable rows already, so don't turn them into lists, to gain perf # filtStd if len(row): it[0] # filtStd it = list(it) # filtStd except: it = [list(row) for row in it] # filtStd x = [row[col] for row in it] # filtStd mean = x | cli.toMean(); std = x | cli.toStd() # filtStd minV = mean - std*fStd; maxV = mean + std*fStd # filtStd if not inv: return [row for row in it if minV <= row[col] <= maxV] # filtStd else: return [row for row in it if minV > row[col] or row[col] > maxV] # filtStd