# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for functions that cuts out specific parts of the table
"""
from typing import Callable, Union, List, overload, Iterator, Any, Set, Tuple
from k1lib.cli.init import BaseCli, fastF; import k1lib.cli.init as init
import k1lib.cli as cli; import k1lib, json, os, math, traceback
from k1lib.cli.typehint import *
import numpy as np; from collections import deque
try: import torch; hasTorch = True
except: hasTorch = False
try: import pandas as pd; pd.core; hasPandas = True
except: hasPandas = False
__all__ = ["filt", "filter_", "inSet", "contains", "empty",
"isNumeric", "instanceOf",
"head", "tail", "cut", "rows",
"intersection", "union", "unique", "breakIf", "mask", "tryout", "resume",
"trigger", "filtStd"]
settings = k1lib.settings.cli
[docs]class filt(BaseCli): # filt
[docs] def __init__(self, predicate:Callable[[Any], bool], column:Union[int, List[int]]=None, catchErrors:bool=False): # filt
"""Filters out elements.
Examples::
# returns [2, 6], grabbing all the even elements
[2, 3, 5, 6] | filt(lambda x: x%2 == 0) | deref()
# returns [3, 5], grabbing all the odd elements
[2, 3, 5, 6] | ~filt(lambda x: x%2 == 0) | deref()
# returns [[2, 'a'], [6, 'c']], grabbing all the even elements in the 1st column
[[2, "a"], [3, "b"], [5, "a"], [6, "c"]] | filt(lambda x: x%2 == 0, 0) | deref()
# throws error, because strings can't mod divide
[1, 2, "b", 8] | filt(lambda x: x % 2 == 0) | deref()
# returns [2, 8]
[1, 2, "b", 8] | filt(lambda x: x % 2 == 0, catchErrors=True) | deref()
You can also pass in :class:`~k1lib.cli.modifier.op` or string, for extra intuitiveness and quickness::
# returns [2, 6]
[2, 3, 5, 6] | filt(op() % 2 == 0) | deref()
# returns ['abc', 'a12']
["abc", "def", "a12"] | filt(op().startswith("a")) | deref()
# returns [3, 4, 5, 6, 7, 8, 9]
range(100) | filt(3 <= op() < 10) | deref()
# returns [3, 4, 5, 6, 7, 8, 9]
range(100) | filt("3 <= x < 10") | deref()
See :class:`~k1lib.cli.modifier.aS` for more details on string mode. If you
pass in :class:`numpy.ndarray` or :class:`torch.Tensor`, then it will
automatically use the C-accelerated versions if possible, like this::
# returns np.array([2, 3, 4]), instead of iter([2, 3, 4])
np.array([1, 2, 3, 4]) | filt(lambda x: x>=2) | deref()
# returns [2, 3, 4], instead of np.array([2, 3, 4]), because `math.exp` can't operate on numpy arrays
np.array([1, 2, 3, 4]) | filt(lambda x: math.exp(x) >= 3) | deref()
If you need more extensive filtering capabilities involving text, check out :class:`~k1lib.cli.grep.grep`
If "filt" is too hard to remember, this cli also has an alias :class:`filter_`
that kinda mimics Python's ``filter()``.
:param predicate: function that returns True or False
:param column: if not specified, then filters elements of the input
array, else filters the specific column only (or columns, just
like in :class:`~k1lib.cli.modifier.apply`)
:param catchErrors: whether to catch errors in the function or not (reject
elements that raise errors). Runs slower if enabled though""" # filt
fs = [predicate]; super().__init__(fs); self.inverted = False; self.preInvFilt = None # filt
if column: # filt
ex = Exception(f"Filtering using a function on a negative-indexed column ({column}) is not supported") # filt
if isinstance(column, int): # filt
if column < 0: raise ex # filt
else: # filt
column = list(column) # filt
if len([c for c in column if c < 0]): raise ex # filt
self.f = f = fs[0]; _fP = fastF(f); self.column = column; self._fPArr = None; self.pArr = None # filt
if catchErrors: # filt
def g(x): # filt
try: return _fP(x) # filt
except: return False # filt
self.predicate = g # filt
else: self.predicate = _fP # filt
def _injectPArr(self, pArr): # filt
if pArr is not None: self.pArr = pArr; self._fPArr = fastF(pArr) # filt
return self # explicit vectorized predicate, can be injected by downstream clis # filt
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # filt
p = self.predicate; c = self.column; fusedPArr = self._fPArr or p # filt
isPd = hasPandas and isinstance(it, pd.core.arraylike.OpsMixin) # filt
if c is None: # filt
if isinstance(it, settings.arrayTypes) or isPd: # filt
a = (it | cli.apply(p)) if self._fPArr is None else self._fPArr(it) # filt
try: return it[a] # filt
except: pass # filt
return (l for l in it if p(l)) # filt
elif isinstance(c, int): # filt
if isinstance(it, settings.arrayTypes): # filt
try: return it[fusedPArr(it[:,c])] # filt
except: pass # filt
if isPd: # filt
try: return it[fusedPArr(it[list(it)[c]])] # filt
except: pass # filt
def gen(): # filt
for es in init.dfGuard(it): # filt
es = list(es) # filt
if c < len(es) and p(es[c]): yield es # filt
return gen() # filt
else: # list of ints # filt
ops = [] # filt
for c_ in c: ops.append(filt(self.predicate, c_, False)._injectPArr(self.pArr)) # filt
return it | cli.serial(*ops) # filt
[docs] def __invert__(self): # filt
"""Negate the condition""" # filt
if self.inverted: raise Exception("Can't invert filt() twice!") # filt
def f(s): # filt
if isinstance(s, settings.arrayTypes): # filt
res = self.predicate(s) # can cause an exception, but that's ok, as that's the signal telling the code in __ror__ to not pass in array types # filt
if isinstance(res, settings.arrayTypes): return ~res # filt
return not self.predicate(s) # filt
ans = filt(f, self.column); ans.preInvFilt = self; ans.inverted = True; return ans # filt
def __neg__(self): # filt
"""Also negates the condition""" # filt
return ~self # filt
[docs] def split(self): # filt
"""Splits the input into positive and negative samples.
Example::
# returns [[0, 2, 4, 6, 8], [1, 3, 5, 7, 9]]
range(10) | filt(lambda x: x%2 == 0).split() | deref()
# also returns [[0, 2, 4, 6, 8], [1, 3, 5, 7, 9]], exactly like above
range(10) | filt(lambda x: x%2 == 0) & filt(lambda x: x%2 != 0) | deref()""" # filt
f = self.predicate; c = self.column; return filt(f, c) & ~filt(f, c) # filt
def _jsF(self, meta): # filt
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); argIdx = init._jsDAuto(); inverted = False # filt
if self.inverted: self = self.preInvFilt; inverted = True # filt
header, _fIdx, _async = k1lib.kast.asyncGuard(k1lib.kast.prepareFunc3(self.f, ("filt", meta))) # filt
return f"{header}\n{fIdx} = {'async ' if _async else ''}({dataIdx}) => {'await ' if _async else ''}{dataIdx}.filt{'_async' if _async else ''}({'async ' if _async else ''}({argIdx}) => {'!' if inverted else ''}({'await ' if _async else ''}{_fIdx}({argIdx})), {cli.kjs.v(self.column)})", fIdx # filt
filter_ = filt # filt
[docs]class inSet(filt): # inSet
[docs] def __init__(self, values:Set[Any], column:int=None, inverse=False): # inSet
"""Filters out lines that is not in the specified set.
Example::
# returns [2, 3]
range(5) | inSet([2, 8, 3]) | deref()
# returns [0, 1, 4]
range(5) | ~inSet([2, 8, 3]) | deref()""" # inSet
if not isinstance(values, (set, dict)): values = set(values) # inSet
super().__init__((lambda l: l in values) if not inverse else (lambda l: not l in values), column) # inSet
self.values = values; self.column = column; self.inverse = inverse # inSet
[docs] def __invert__(self): return inSet(self.values, self.column, not self.inverse) # inSet
def _jsF(self, meta): # inSet
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); setIdx = init._jsDAuto() # inSet
return f"{setIdx} = {json.dumps(list(self.values))};\n{fIdx} = ({dataIdx}) => {dataIdx}.inSet({setIdx}, {cli.kjs.v(self.column)}, {cli.kjs.v(self.inverse)})", fIdx # inSet
[docs]class contains(filt): # contains
[docs] def __init__(self, s:str, column:int=None, inverse=False): # contains
"""Filters out lines that don't contain the specified substring. Sort of similar
to :class:`~k1lib.cli.grep.grep`, but this is simpler, and can be inverted.
Example::
# returns ['abcd', '2bcr']
["abcd", "0123", "2bcr"] | contains("bc") | deref()""" # contains
super().__init__((lambda e: s in e) if not inverse else (lambda e: not s in e), column) # contains
self.s = s; self.column = column; self.inverse = inverse # contains
[docs] def __invert__(self): return contains(self.s, self.column, not self.inverse) # contains
def _jsF(self, meta): # contains
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); sIdx = init._jsDAuto() # contains
return f"{sIdx} = {json.dumps(self.s)};\n{fIdx} = ({dataIdx}) => {dataIdx}.contains({sIdx}, {cli.kjs.v(self.column)}, {cli.kjs.v(self.inverse)})", fIdx # contains
[docs]class empty(BaseCli): # empty
[docs] def __init__(self, reverse=False): # empty
"""Filters out streams that is not empty. Almost always used inverted,
but "empty" is a short, sweet name that's easy to remember. Example::
# returns [[1, 2], ['a']]
[[], [1, 2], [], ["a"]] | ~empty() | deref()
:param reverse: not intended to be used by the end user. Do ``~empty()`` instead.""" # empty
super().__init__(); self.reverse = reverse # empty
[docs] def __ror__(self, streams:Iterator[Iterator[Any]]) -> Iterator[Iterator[Any]]: # empty
r = self.reverse # empty
for stream in streams: # empty
try: # empty
item, it = stream | cli.peek() # empty
if not r: # empty
if it == []: yield it # empty
else: # empty
if it != []: yield it # empty
except StopIteration: pass # empty
[docs] def __invert__(self): # empty
return empty(not self.reverse) # empty
[docs]def isNumeric(column:int=None) -> filt: # isNumeric
"""Filters out a line if that column is not a number.
Example::
# returns [0, 2, '3']
[0, 2, "3", "a"] | isNumeric() | deref()""" # isNumeric
def f(v): # isNumeric
try: float(v); return True # isNumeric
except ValueError: return False # isNumeric
return filt(f, column) # isNumeric
[docs]def instanceOf(cls:Union[type, Tuple[type]], column:int=None) -> filt: # instanceOf
"""Filters out lines that is not an instance of the given type.
Example::
# returns [2]
[2, 2.3, "a"] | instanceOf(int) | deref()
# returns [2, 2.3]
[2, 2.3, "a"] | instanceOf((int, float)) | deref()""" # instanceOf
if isinstance(cls, list): cls = tuple(cls) # instanceOf
return filt(lambda e: isinstance(e, cls), column) # instanceOf
def sliceable(it): # sliceable
try: it[:]; len(it); return True # sliceable
except: return False # sliceable
def _head(n, inverted, it): # _head
it = iter(it) # _head
if n is None: # _head
if not inverted: yield from it # _head
else: return # _head
elif n >= 0: # _head
if not inverted: # _head
for i, line in zip(range(n), it): yield line # _head
else: # _head
for i, line in zip(range(n), it): pass # _head
yield from it # _head
else: # _head
if not inverted: # head to -3 # _head
n = abs(n); queue = deque() # _head
for line in it: # _head
queue.append(line) # _head
if len(queue) > n: yield queue.popleft() # _head
else: yield from deque(it, -n) # -3 to end # _head
[docs]class head(BaseCli): # head
[docs] def __init__(self, n=10): # head
"""Only outputs first ``n`` elements. You can also negate it (like
``~head(5)``), which then only outputs after first ``n`` lines. Examples::
"abcde" | head(2) | deref() # returns ["a", "b"]
"abcde" | ~head(2) | deref() # returns ["c", "d", "e"]
"0123456" | head(-3) | deref() # returns ['0', '1', '2', '3']
"0123456" | ~head(-3) | deref() # returns ['4', '5', '6']
"012" | head(None) | deref() # returns ['0', '1', '2']
"012" | ~head(None) | deref() # returns []
You can also pass in fractional head::
range(20) | head(0.25) | deref() # returns [0, 1, 2, 3, 4], or the first 25% of samples
Also works well and fast with :class:`numpy.ndarray`, :class:`torch.Tensor`
and other sliceable types::
# returns (10,)
np.linspace(1, 3) | head(10) | shape()""" # head
super().__init__(); self.n = n; self.inverted = False; self._sliceable = None # head
def _all_array_opt(self, it, level): # head
n = self.n; inverted = self.inverted # head
if n is not None and round(n) != n: n = int(it.shape[level]*n) # fractional head # head
sl = tuple([slice(None)]*level); return it[(*sl, slice(n, None))] if inverted else it[(*sl, slice(None, n))] # head
def _typehint(self, inp): # head
if isinstance(inp, tListIter): return inp # head
if isinstance(inp, tArrayTypes): return inp # head
if inp == str: return str # head
return tIter(tAny()) # head
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # head
n = self.n; inverted = self.inverted # head
if n is not None and round(n) != n: # fractional head # head
if not sliceable(it): it = list(it) # head
i = int(len(it)*n) # head
return it[i:] if inverted else it[:i] # head
if inverted and n is None: return [] # special case # head
_sliceable = self._sliceable # all of this to cache sliceable value, because it takes a lot of time to determine whether it's sliceable or not # head
if _sliceable is None: self._sliceable = _sliceable = sliceable(it) # head
if _sliceable: return it[n:] if inverted else it[:n] # head
else: return _head(self.n, self.inverted, it) # head
[docs] def __invert__(self): # head
h = head(self.n); h.inverted = not self.inverted # head
return h # head
[docs] def split(self): # head
"""Splits the list up into a head and tail sections.
Example::
# returns [[0, 1, 2, 3], [4, 5, 6, 7, 8, 9]]
range(10) | head(4).split() | deref()
This only splits it into 2 parts. If you want to split it up
into many more parts with specified checkpoints, check out
:class:`~k1lib.cli.structural.splitC`.""" # head
# return self & ~self # old version # head
return headSplit(self.n, self.inverted) # head
def _jsF(self, meta): # head
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); argIdx = init._jsDAuto() # head
return f"{fIdx} = ({dataIdx}) => {dataIdx}.head({self.n}, {cli.kjs.v(self.inverted)})", fIdx # head
class headSplit(BaseCli): # headSplit
def __init__(self, n, inverted): # headSplit
self.n = n; self.inverted = inverted # headSplit
self.fixup = n is None or isinstance(n, float) or n < 0 # headSplit
self.sliceable = None # headSplit
def _all_array_opt(self, it, level): # headSplit
n = self.n; inverted = self.inverted # headSplit
if n is not None and round(n) != n: n = int(it.shape[level]*n) # fractional head # headSplit
sl = tuple([slice(None)]*level); b = it[(*sl, slice(n, None))]; a = it[(*sl, slice(None, n))] # headSplit
return (b, a) if inverted else (a, b) # headSplit
def __ror__(self, it): # headSplit
sliceable_ = self.sliceable; n = self.n; inverted = self.inverted # headSplit
if sliceable_ is None: self.sliceable = sliceable_ = sliceable(it) # headSplit
it = it if sliceable_ else list(it) # headSplit
if self.fixup: # needs to fix n to a more definite value. Just to make it faster # headSplit
l = len(it) # headSplit
if n is None: return it, [] # headSplit
if isinstance(n, float): n = int(l*n) # fractional head # headSplit
n = (n+l)%l # headSplit
return (it[n:], it[:n]) if inverted else (it[:n], it[n:]) # headSplit
[docs]class tail(BaseCli): # tail
[docs] def __init__(self, n:int=10): # tail
"""Basically an inverted :class:`head`.
Examples::
range(10) | tail(3) | deref() # returns [7, 8, 9]""" # tail
self.n = n # tail
[docs] def __ror__(self, it): return it | ~head(-self.n) # tail
def _jsF(self, meta): # tail
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); argIdx = init._jsDAuto() # tail
return f"{fIdx} = ({dataIdx}) => {dataIdx}.head(-({self.n}), true)", fIdx # tail
class lazyList: # lazyList
def __init__(self, it): # lazyList
self.it = iter(it); self.elems = [] # lazyList
def __getitem__(self, idx): # lazyList
elems = self.elems; it = self.it # lazyList
for _ in range(len(elems)-1, idx): elems.append(next(it)) # lazyList
return elems[idx] # lazyList
[docs]class rows(BaseCli): # rows
[docs] def __init__(self, *rows:List[int]): # rows
"""Selects specific elements given an iterator of indexes.
Space complexity O(1) as a list is not constructed (unless you're
slicing it in really weird way). Example::
"0123456789" | rows(2) | toList() # returns ["2"]
"0123456789" | rows(5, 8) | toList() # returns ["5", "8"]
"0123456789" | rows()[2:5] | toList() # returns ["2", "3", "4"]
"0123456789" | ~rows()[2:5] | toList() # returns ["0", "1", "5", "6", "7", "8", "9"]
"0123456789" | ~rows()[:7:2] | toList() # returns ['1', '3', '5', '7', '8', '9']
"0123456789" | rows()[:-4] | toList() # returns ['0', '1', '2', '3', '4', '5']
"0123456789" | ~rows()[:-4] | toList() # returns ['6', '7', '8', '9']
Why it's called "rows" is because I couldn't find a good name for
it. There was :class:`cut`, which the name of an actual bash cli
that selects out columns given indicies. When I needed a way to
do what this cli does, it was in the context of selecting out rows,
so the name stuck.
If you want to just pick out the nth item from the iterator, instead of doing
this::
iter(range(10)) | rows(3) | item() # returns 3
... you can use the shorthand :class:`~k1lib.cli.utils.rItem` instead::
iter(range(10)) | rItem(3) # returns 3
:param rows: ints for the row indices""" # rows
if len(rows) == 1 and isinstance(rows[0], slice): self.slice = rows[0]; self.rows = None; self.idxMode = False # rows
else: self.slice = None; self.rows = rows; self.sortedRows = sorted(rows); self.idxMode = True # rows
self.inverted = False # rows
def __getitem__(self, _slice): # rows
start, stop, step = _slice.start, _slice.stop, _slice.step # rows
if step == None or step == 1: # rows
if start == None and stop == None: return cli.iden() # rows
if start == None: return head(stop) # rows
if stop == None: return ~head(start) # rows
elif step == 0: return cli.ignore() # rows
answer = rows(_slice); answer.inverted = self.inverted; return answer # rows
def _all_array_opt(self, it, level:int): # rows
a = np.array(self.rows) if self.rows else self.slice; s = [slice(None, None, None)]*level # rows
if self.inverted: mask = np.ones(it.shape[level], dtype=bool); mask[a] = False; return it[(*s, mask)] # rows
return it[(*s, a)] # rows
[docs] def __invert__(self): self.inverted = not self.inverted; return self # rows
[docs] def __ror__(self, it:Iterator[str]): # rows
idxMode = self.idxMode; inverted = self.inverted; sl = self.slice; rw = self.rows # rows
if hasPandas and isinstance(it, pd.core.frame.DataFrame): # rows
if sl is not None: return it.iloc[sl] if not inverted else it.iloc[np.array(list(set(range(len(it))) - set(range(len(it))[sl])))] # rows
else: return it.iloc[list(rw)] if not inverted else it.iloc[np.array(list(set(range(len(it))) - set(rw)))] # rows
def gen(it): # rows
if not inverted: # rows
if idxMode: # rows
if len(self.sortedRows) == 0: return # rows
it = list(it) if self.sortedRows[0] < 0 else lazyList(it) # rows
for idx in rw: yield it[idx] # rows
else: yield from list(it)[sl] # rows
else: # rows
it = list(it); n = len(it); idxs = set((e if e >= 0 else n+e) for e in rw) if idxMode else set(range(n)[sl]) # rows
yield from (e for i, e in enumerate(it) if i not in idxs) # rows
if isinstance(it, settings.arrayTypes): # rows
a = np.array(rw) if rw else sl # rows
if inverted: mask = np.ones(len(it), dtype=bool); mask[a] = False; return it[mask] # rows
else: return it[a] # rows
return gen(it) # rows
def _jsF(self, meta): # rows
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # rows
if not self.slice is None: raise Exception("rows._jsF() doesn't support slice-based indexing yet") # rows
if self.inverted: raise Exception("rows._jsF() doesn't support inversion yet") # rows
return f"{fIdx} = ({dataIdx}) => {dataIdx}.rows({cli.kjs.vs(self.rows) | cli.join(', ')})", fIdx # rows
[docs]class cut(BaseCli): # cut
[docs] def __init__(self, *columns:List[int]): # cut
"""Cuts out specific columns, sliceable. Examples::
["0123456789", "abcdefghij"] | cut(5, 8) | deref() # returns [['5', '8'], ['f', 'i']]
["0123456789", "abcdefghij"] | cut(8, 5) | deref() # returns [['8', '5'], ['i', 'f']], demonstrating permutation-safe
["0123456789"] | cut(5, 8) | deref() # returns [['5', '8']]
["0123456789"] | cut(8, 5) | deref() # returns [['8', '5']], demonstrating permutation-safe
["0123456789", "abcdefghij"] | cut(2) | deref() # returns ['2', 'c'], instead of [['2'], ['c']] as usual
["0123456789"] | cut(2) | deref() # returns ['2']
["0123456789"] | cut(5, 8) | deref() # returns [['5', '8']]
["0123456789"] | ~cut()[:7:2] | deref() # returns [['1', '3', '5', '7', '8', '9']]
In the first example, you can imagine that we're operating on this table::
0123456789
abcdefghij
Then, we want to grab the 5th and 8th column (0-indexed), which forms this table::
58
fi
So, result of that is just ``[['5', '8'], ['f', 'i']]``
In the fourth example, if you're only cutting out 1 column, then it
will just grab that column directly, instead of putting it in a list.
If you pass in :class:`numpy.ndarray` or :class:`torch.Tensor`, then it will
automatically use the C-accelerated versions, like this::
torch.randn(4, 5, 6) | cut(2, 3) # returns tensor of shape (4, 2, 6)
torch.randn(4, 5, 6) | cut(2) # returns tensor of shape (4, 6)
torch.randn(4, 5, 6) | ~cut()[2:] # returns tensor of shape (4, 2, 6)
.. warning::
TD;DR: inverted negative indexes are a bad thing when rows don't have the same number of elements
Everything works fine when all of your rows have the same number of elements. But things might behave a
little strangely if they don't. For example::
# returns [['2', '3', '4'], ['2', '3', '4', '5', '6', '7']]. Different number of columns, works just fine
["0123456", "0123456789"] | cut()[2:-2] | deref()
# returns [['0', '1', '8', '9'], ['a', 'b', 'i', 'j']]. Same number of columns, works just fine
["0123456789", "abcdefghij"] | ~cut()[2:-2] | deref()
# returns [['0', '1', '5', '6'], ['0', '1', '5', '6', '7', '8', '9']]. Different number of columns, unsupported invert case
["0123456", "0123456789"] | ~cut()[2:-2] | deref()
Why does this happen? It peeks at the first row, determines that ~[2:-2] is equivalent
to [:2] and [5:] combined and not [:2] and [-2:] combined. When applied to the second row,
[-2:] goes from 5->9, hence the result. Another edge case would be::
# returns [['0', '1', '2', '3', '5', '6'], ['0', '1', '2', '3', '5', '6', '7', '8', '9']]
["0123456", "0123456789"] | ~cut(-3) | deref()
Like before, it peeks the first row and translate ~(-3) into ~4, which is equivalent to [:4] and [5:].
But when applied to the second row, it now carries the meaning ~4, instead of ~(-3).
Why don't I just fix these edge cases? Because the run time for it would be completely unacceptable,
as we'd have to figure out what's the columns to include in the result for every row. This could
easily be O(n^3). Of course, with more time optimizing, this could be solved, but this is the only
extreme edge case and I don't feel like putting in the effort to optimize it.""" # cut
super().__init__() # cut
if len(columns) == 1 and isinstance(columns[0], slice): columns = columns[0] # columns is either a slice object, or a list of ints # cut
self.columns = columns; self.inverted = False # columns: list[int] | slice # cut
def _all_array_opt(self, it, level): # cut
c = self.columns; r = rows(c) if isinstance(c, slice) else rows(*c) # cut
if self.inverted: r = ~r # cut
it = it | r.all(level+1); return (it | cli.item().all(level+1)) if not isinstance(c, slice) and len(c) == 1 else it # cut
[docs] def __ror__(self, it): # cut
columns = self.columns; inverted = self.inverted # cut
if hasPandas and isinstance(it, pd.core.frame.DataFrame): # cut
itt = (it | cli.T()); n = len(itt); colNames = list(it) # cut
if isinstance(columns, slice): # cut
columns = list(sorted(set(range(n)) - set(range(n)[self.columns]))) if self.inverted else range(len(colNames))[columns] # cut
return it[[colNames[x] for x in columns]] # cut
else: # cut
columns = list(sorted(set(range(n)) - set(self.columns))) if self.inverted else self.columns # cut
return it[colNames[columns[0]]] if len(columns) == 1 and not inverted else it[[colNames[x] for x in columns]] # cut
isArray = isinstance(it, settings.arrayTypes)#; isArray = False # cut
if isArray: nCols = len(it[0]); prs = rs = range(nCols) # range(nColumns). "prs" for padded rs # cut
else: # carefully peaking first row and get the number of columns # cut
it = iter(it); sentinel = object(); row = next(it, sentinel) # cut
if row is sentinel: return [] # cut
row = list(row); it = it | cli.insert(row); nCols = len(row) # cut
rs = range(nCols); prs = range(nCols+20) # 20 for longer rows below. Also "rs" is not a great name, deal with it # cut
if isinstance(columns, slice): # cut
if not inverted: return it[:,columns] if isArray else (row[columns] for row in (list(row) for row in it)) # cut
columns = list(set(rs[columns])) # cut
columns = [e if e >= 0 else nCols + e for e in columns] # clear negative indicies # cut
if self.inverted: columns = list(set(e for e in prs if e not in columns)) # cut
if len(columns) == 1: c = columns[0]; return it[:,c] if isArray else (r[c] for r in (list(row) for row in it) if len(r) > c) # cut
else: return it[:,columns] if isArray else ([row[c] for c in columns if c < len(row)] for row in (list(row) for row in it)) # cut
def __getitem__(self, idx): answer = cut(idx); answer.inverted = self.inverted; return answer # cut
[docs] def __invert__(self): self.inverted = not self.inverted; return self # cut
def _jsF(self, meta): # cut
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # cut
if isinstance(self.columns, slice): raise Exception("cut._jsF() doesn't support slice-based indexing yet") # cut
return f"{fIdx} = ({dataIdx}) => {dataIdx}.{'cutInv' if self.inverted else 'cut'}({cli.kjs.vs(self.columns) | cli.join(', ')})", fIdx # cut
[docs]class intersection(BaseCli): # intersection
[docs] def __init__(self, column=None, full=False): # intersection
"""Returns the intersection of multiple streams.
Example::
# returns set([2, 4, 5])
[[1, 2, 3, 4, 5], [7, 2, 4, 6, 5]] | intersection()
# returns ['2g', '4h', '5j']
[["1a", "2b", "3c", "4d", "5e"], ["7f", "2g", "4h", "6i", "5j"]] | intersection(0) | deref()
If you want the full distribution, meaning the intersection, as well
as what's left of each stream, you can do something like this::
# returns [{2, 4, 5}, [1, 3], [7, 6]]
[[1, 2, 3, 4, 5], [7, 2, 4, 6, 5]] | intersection(full=True) | deref()
:param column: what column to apply the intersection
on. Defaulted to None
:param full: if specified, return the full distribution, instead of the intersection alone""" # intersection
super().__init__(); self.column = column # intersection
self.full = full # intersection
self.f = intersection(column, False) if full else None # intersection
def _typehint(self, inp): # intersection
if self.column is None: # intersection
if isinstance(inp, tArrayTypes): return tSet(inp.child) # intersection
if isinstance(inp, tListIterSet): # intersection
if isinstance(inp.child, tListIterSet): # intersection
return tSet(inp.child.child) # intersection
return tSet(tAny()) # intersection
if isinstance(inp, tCollection): # intersection
a = inp.children[0] # intersection
for e in inp.children: # intersection
if not isinstance(e, tListIterSet): return tSet(tAny()) # intersection
if e.child != a.child: return tSet(tAny()) # intersection
return tSet(a.child) # intersection
return tSet(tAny()); # intersection
else: return tAny() # intersection
[docs] def __ror__(self, its:Iterator[Iterator[Any]]) -> Set[Any]: # intersection
c = self.column; full = self.full; f = self.f # intersection
if full: # intersection
if c is None: # intersection
its = its | cli.deref(2); inter = its | f # intersection
return [inter, *((e for e in it if e not in inter) for it in its)] # intersection
else: raise Exception("intersection(int, True) mode not supported yet, as it's a little ambiguous what's the use case is, and there're many styles of functionality that this can take on") # intersection
if c is None: # intersection
answer = None # intersection
for it in its: # intersection
if answer is None: answer = set(it); continue # intersection
answer = answer.intersection(it) # intersection
return set() if answer is None else answer # intersection
else: # intersection
its = its | cli.deref(2); ans = {} # intersection
ids = its | cut(c).all() | intersection() | cli.aS(set) # intersection
for it in its: # intersection
for row in it: # intersection
e = row[c] # intersection
if e in ids: ans[e] = row # intersection
return ans.values() # intersection
def _jsF(self, meta): # intersection
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # intersection
if not self.column is None: raise Exception("intersection._jsF() doesn't support targeting specific .column yet") # intersection
if not self.full: raise Exception("intersection._jsF() doesn't support .full yet") # intersection
return f"{fIdx} = ({dataIdx}) => {dataIdx}.intersection()", fIdx # intersection
[docs]class union(BaseCli): # union
[docs] def __init__(self): # union
"""Returns the union of multiple streams.
Example::
# returns {0, 1, 2, 10, 11, 12, 13, 14}
[range(3), range(10, 15)] | union()
""" # union
super().__init__() # union
def _typehint(self, inp): # union
return intersection()._typehint(inp) # union
[docs] def __ror__(self, its:Iterator[Iterator[Any]]) -> Set[Any]: # union
answer = set() # union
for it in its: answer = set.union(answer, set(it)) # union
return answer # union
def _jsF(self, meta): # union
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # union
return f"{fIdx} = ({dataIdx}) => {dataIdx}.union()", fIdx # union
[docs]class unique(BaseCli): # unique
[docs] def __init__(self, column:int=None): # unique
"""Filters out non-unique row elements.
Example::
# returns [[1, "a"], [2, "a"]]
[[1, "a"], [2, "a"], [1, "b"]] | unique(0) | deref()
# returns [0, 1, 2, 3, 4]
[*range(5), *range(3)] | unique() | deref()
In the first example, because the 3rd element's first column is
1, which has already appeared, so it will be filtered out.
:param column: the column to detect unique elements. Can be
None, which will behave like converting the input iterator
into a set, but this cli will maintain the order""" # unique
super().__init__(); self.column = column # unique
[docs] def __ror__(self, it): # unique
c = self.column # unique
if c is None: # unique
if isinstance(it, settings.arrayTypes): bm = np if isinstance(it, np.ndarray) else (torch if hasTorch and isinstance(it, torch.Tensor) else None); return bm.unique(it) # unique
def gen(): # unique
terms = set() # unique
for e in it: # unique
if e not in terms: yield e # unique
terms.add(e) # unique
else: # unique
def gen(): # unique
terms = set() # unique
for row in it: # unique
row = list(row); e = row[c] # unique
if e not in terms: yield row # unique
terms.add(e) # unique
return gen() # unique
def _jsF(self, meta): # unique
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # unique
return f"{fIdx} = ({dataIdx}) => {dataIdx}.unique({cli.kjs.v(self.column)})", fIdx # unique
[docs]class breakIf(BaseCli): # breakIf
[docs] def __init__(self, f, col:int=None): # breakIf
"""Breaks the input iterator if a condition is met.
Example::
# returns [0, 1, 2, 3, 4, 5]
[*range(10), 2, 3] | breakIf(lambda x: x > 5) | deref()
# returns [[1, 'a'], [2, 'b']]
[[1, "a"], [2, "b"], [3, "c"], [2, "d"], [1, "e"]] | breakIf("x > 2", 0) | deref()
:param col: column to apply the condition on""" # breakIf
fs = [f]; super().__init__(fs); self.f = fs[0]; self._fC = fastF(self.f); self.col = col # breakIf
def _typehint(self, inp): # breakIf
if isinstance(inp, tListIterSet): return tIter(inp.child) # breakIf
return tIter(tAny()) # breakIf
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # breakIf
f = self._fC; col = self.col # breakIf
if hasPandas and isinstance(it, pd.core.arraylike.OpsMixin): # breakIf
ndim = len(it | cli.shape()) # breakIf
a = init.preprocessPd(it, col, lambda x: bool(f(x)), lambda x: f(x).astype(bool)) # breakIf
return it[:a.argmax()] if a[a.argmax()] else it # breakIf
def gen(): # breakIf
if col is None: # breakIf
for line in it: # breakIf
if f(line): break # breakIf
yield line # breakIf
else: # breakIf
for row in it: # breakIf
if f(row[col]): break # breakIf
yield row # breakIf
return gen() # breakIf
def _jsF(self, meta): # breakIf
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); argIdx = init._jsDAuto() # breakIf
header, _fIdx = k1lib.kast.prepareFunc3(self.f, ("breakIf", meta)) # breakIf
return f"{header}\n{fIdx} = ({dataIdx}) => {dataIdx}.breakIf(({argIdx}) => {_fIdx}({argIdx}))", fIdx # breakIf
[docs]class mask(BaseCli): # mask
[docs] def __init__(self, mask:Iterator[bool]): # mask
"""Masks the input stream.
Example::
# returns [0, 1, 3]
range(5) | mask([True, True, False, True, False]) | deref()
# returns [2, 4]
range(5) | ~mask([True, True, False, True, False]) | deref()
# returns torch.tensor([0, 1, 3])
torch.tensor(range(5)) | mask([True, True, False, True, False])""" # mask
super().__init__(); self.mask = mask; self.inverted = False # mask
[docs] def __invert__(self): res = mask(self.mask); res.inverted = not self.inverted; return res # mask
[docs] def __ror__(self, it): # mask
if self.inverted: # mask
if isinstance(it, settings.arrayTypes): return it[[not e for e in self.mask]] # mask
if hasPandas and isinstance(it, pd.core.arraylike.OpsMixin): return it[[not e for e in self.mask]] # mask
return (e for e, m in zip(it, self.mask) if not m) # mask
else: # mask
if isinstance(it, settings.arrayTypes): return it[list(self.mask)] # mask
if hasPandas and isinstance(it, pd.core.arraylike.OpsMixin): return it[list(self.mask)] # mask
return (e for e, m in zip(it, self.mask) if m) # mask
[docs]class tryout(BaseCli): # tryout
[docs] def __init__(self, result=None, retries=0, mode="result"): # tryout
"""Wraps every cli operation after this in a try-catch block, returning ``result``
if the operation fails. Example::
# returns 9
3 | (tryout("failed") | op()**2)
# returns "failed", instead of raising an exception
"3" | (tryout("failed") | op()**2)
# special mode: returns "unsupported operand type(s) for ** or pow(): 'str' and 'int'"
"3" | (tryout(mode="str") | op()**2)
# special mode: returns entire trace stack (do `import traceback` first)
"3" | (tryout(mode="traceback") | op()**2)
# special mode: returns "3", the input of the tryout() block
"3" | (tryout(mode="input") | op()**2)
By default, this ``tryout()`` object will gobble up all clis behind it and wrap
them inside a try-catch block. This might be undesirable, so you can stop it early::
# returns "failed"
3 | (tryout("failed") | op()**2 | aS(str) | op()**2)
# raises an exception, because it errors out after the tryout()-captured operations
3 | (tryout("failed") | op()**2) | aS(str) | op()**2
In the first example, :class:`tryout` will catch any errors happening within ``op()``,
``aS(str)`` or the second ``op()**2``. In the second example, :class:`tryout` will only
catch errors happening within the first ``op()**2``.
.. admonition:: Array mode
The above works well for atomic operations and not looping operations. Let's
say we have this function::
counter = 0
def f(x):
global counter
if x > 5:
counter += 1
if counter < 3: raise Exception(f"random error: {x}")
return x**2
This code will throw an error if x is greater than 5 for the first and second
time (but runs smoothly after that. It's a really nasty function I know).
Capturing like this will work::
counter = 0 # line below returns [0, 1, 4, 9, 16, 25, 'failed', 'failed', 64, 81]
range(10) | apply(tryout("failed") | aS(f)) | deref()
But capturing like this won't work::
counter = 0 # line below throws an exception
range(10) | (tryout("failed") | apply(f)) | deref()
The reason being, :class:`tryout` will only capture errors when the data is passed
into ``apply(f)``, and won't capture it later on. However, when data is passed to
``apply(f)``, it hasn't executed anything yet (remember these things are lazily
executed). So the exception actually happens when you're trying to ``deref()`` it,
which lies outside of :class:`tryout`'s reach. You can just put a tilde in front
to tell it to capture errors for individual elements in the iterator::
counter = 0 # line belows returns [0, 1, 4, 9, 16, 25, 'failed', 'failed', 64, 81]
range(10) | (~tryout("failed") | apply(f)) | deref()
This mode has a weird quirk that requires that there has to be a 1-to-1 correspondence
between the input and output for the block of code that it wraps around. Meaning this is okay::
def g(x):
global counter
if 40 > x[0] >= 30:
counter += 1
if counter < 5: raise Exception("random error")
return x
counter = 0 # returns 50, corrects errors as if it's not even there!
range(50) | (~tryout(None, 6) | batched(10, True) | apply(g) | joinStreams()) | deref() | shape(0)
This is okay because going in, there're 50 elements, and it's expected that 50 elements
goes out of :class:`tryout`. The input can be of infinite length, but there has to be a
1-to-1 relationship between the input and output. While this is not okay::
counter = 0 # returns 75, data structure corrupted
range(50) | (~tryout(None, 6) | batched(10, True) | apply(g) | joinStreams() | batched(2, True)) | joinStreams() | deref() | shape(0)
It's not okay because it's expected that 25 pairs of elements goes out of :class:`tryout`
.. admonition:: Retrying
There's also the ``retries`` parameter, which specifies how many times should this
class retry the operation until actually returning the predefined result::
counter = 0 # line below returns [0, 1, 4, 9, 16, 25, None, None, 64, 81]
range(10) | (~tryout(retries=0) | apply(f)) | deref()
counter = 0 # line below returns [0, 1, 4, 9, 16, 25, None, 49, 64, 81]
range(10) | (~tryout(retries=1) | apply(f)) | deref()
counter = 0 # line below returns [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
range(10) | (~tryout(retries=2) | apply(f)) | deref()
:param result: result to return if there is an exception
:param mode: if "result" (default), returns the result if there's an exception. If "str" then
returns the exception's string. If "input" then returns the original input. If "traceback"
then returns the exception's traceback
:param retries: how many time to retry before giving up?""" # tryout
super().__init__(capture=True); self.result = result; self.inverted = False; self.retries = retries; self.mode = mode # tryout
[docs] def __ror__(self, it): # tryout
retries = self.retries; result = self.result; mode = self.mode; it = init.dfGuard(it) # tryout
if len(self.capturedClis) == 0: raise Exception("tryout() currently does not wrap around any other cli. You may need to change `data | tryout() | cli1() | cli2()` into `data | (tryout() | cli1() | cli2())`") # tryout
if not self.inverted: # single mode # tryout
while True: # tryout
try: return it | self.capturedSerial # tryout
except Exception as e: # tryout
if retries <= 0: return str(e) if mode == "str" else (it if mode == "input" else (traceback.format_exc() if mode == "traceback" else result)) # tryout
retries -= 1 # tryout
else: # array mode # tryout
def gen(it): # tryout
patience = retries; savedInputs = k1lib.Wrapper(deque()); ogInp = None # tryout
def interceptIt(it): # tryout
for e in it: savedInputs().append(e); yield e # tryout
it = iter(it); ogIt = it; it = interceptIt(it); outIt = it | self.capturedSerial # tryout
while True: # tryout
try: e = next(outIt); yield e; savedInputs().popleft(); patience = retries # tryout
except StopIteration: break # tryout
except Exception as e: # tryout
if patience <= 0: ogInp = savedInputs().popleft(); patience = retries # ran out of patience, so gonna just return the canned result instead # tryout
else: patience -= 1 # tryout
# restart the loop # tryout
it = interceptIt([list(savedInputs()), ogIt] | cli.joinStreams()) # tryout
savedInputs.value = deque(); outIt = it | self.capturedSerial # tryout
if patience == retries: # tryout
yield str(e) if mode == "str" else (ogInp if mode == "input" else (traceback.format_exc() if mode == "traceback" else result)) # just resetted # tryout
return gen(it) # tryout
[docs] def __invert__(self): self.inverted = not self.inverted; return self # tryout
[docs]def resume(fn): # resume
"""Resumes a long-running operation. I usually have code that
looks like this::
def f(x): pass # long running, expensive calculation
ls(".") | applyMp(f) | apply(dill.dumps) | file("somefile.pth") # executing
cat.pickle("somefile.pth") | aS(list) # getting all of the saved objects
This will read all the files in the current directory, transforms
them using the long-running, expensive function, potentially doing
it in multiple processes. Then the results are serialized (turns into
bytes) and it will be appended to an output file.
What's frustrating is that I do stupid things all the time, so the
process usually gets interrupted. But I don't want to redo the
existing work, so that's where this cli comes into play. Now it looks
like this instead::
ls(".") | resume("somefile.pth") | applyMp(f) | apply(dill.dumps) >> file("somefile.pth")
Note that we're inserting a resume() AND changed the file write mode
to append, so that the file doesn't get overriden. Internally, this is
just a shorthand for ``~head(fn | (tryout(0) | aS(cat.pickle) | shape(0)))``
:param fn: file name""" # resume
return ~cli.head(fn | (cli.tryout(0) | cli.aS(cli.cat.pickle) | cli.shape(0))) # resume
[docs]class trigger(BaseCli): # trigger
[docs] def __init__(self, col=None, before=False): # yields whenever there's a delta # trigger
"""Yields elements whenever the requested column changes in value.
Example::
# returns [1, 2, 3, 4, 2]
[1, 1, 1, 2, 2, 3, 4, 4, 2, 2] | trigger() | cli.deref()
data = [[1, 0], [2, 1], [2, 2], [1, 3], [1, 4]]
# returns [[1, 0], [2, 1], [1, 3]]
data | trigger(0) | deref()
# returns [[1, 0], [2, 2], [1, 4]], note how on a trigger boundary, this returns the previous ("before") value, not the next value
data | trigger(0, True) | deref()
data = [[1, 0], [2, 2], [2, 2], [1, 3], [1, 4]]
# returns [[1, 0], [2, 2], [1, 3], [1, 4]]
data | trigger([0, 1]) | deref()
This takes inspiration from electrical engineering, where a capacitor is
essentially a "trigger detector". If the input voltage changes too rapidly,
the capacitor will make the output voltage spike up or down dramatically,
as if it's taking the derivative of the input signal. Slightly related is
the Schmitt trigger.
Anyway, this cli will track whether a column changes in value, and if it
does, yields the change, else ignore it. This is useful when processing
time series data, to discard adjacent messages that are effectively identical
and carries no extra information.
The parameter .col can be one of these 3 types:
- None (default): This considers the whole element/row for value change signal
- int: This considers a single column for value change signal
- list[int]: This considers a tuple of the selected columns for value change signal
Related, but not precisely the same operation include :class:`~k1lib.cli.filt.unique`
and ``aS(set)``
:param col: column to trigger
:param before: whether to take the previous or the next value when it
changes. Defaulted to next value (False)""" # trigger
self.col = col; self.before = before # trigger
[docs] def __ror__(self, it): # trigger
it = init.dfGuard(it); empty = object() # random sentinel # trigger
col = self.col; before = self.before; lastValue = empty; lastRow = empty # trigger
if col is not None: # trigger
if isinstance(col, int): # single column # trigger
if before: # trigger
for row in it: # trigger
row = list(row); value = row[col] # trigger
if value != lastValue: # trigger
if lastRow is not empty: yield lastRow # trigger
lastRow = row; lastValue = value # trigger
yield row # trigger
else: # trigger
for row in it: # trigger
row = list(row); value = row[col] # trigger
if value != lastValue: yield row; lastRow = row; lastValue = value # trigger
else: # list[int], multiple columns considered 1 # trigger
if before: # trigger
for row in it: # trigger
row = list(row); value = tuple(row[i] for i in col) # trigger
if value != lastValue: # trigger
if lastRow is not empty: yield lastRow # trigger
lastRow = row; lastValue = value # trigger
yield row # trigger
else: # trigger
for row in it: # trigger
row = list(row); value = tuple(row[i] for i in col) # trigger
if value != lastValue: yield row; lastRow = row; lastValue = value # trigger
else: # tales the whole thing in # trigger
if before: # trigger
for value in it: # trigger
if value != lastValue: # trigger
if lastValue is not empty: yield lastValue # trigger
lastValue = value # trigger
yield value # trigger
else: # trigger
for value in it: # trigger
if value != lastValue: yield value; lastValue = value # trigger
[docs]class filtStd(BaseCli): # filtStd
[docs] def __init__(self, col:int=None, std:float=2, N:int=1): # filtStd
"""Filters out values that is outside the specified standard deviation.
Example::
data = [*np.random.randn(100), *np.random.randn(10)*10] | randomize(None) | deref()
data | filtStd(std=2) | shape(0) # likely returns around 104
data | filtStd(std=0.1) | shape(0) # likely returns around 22
# column mode
data | apply(lambda x: ["a", x]) | filtStd(1, std=2) | shape(0) # likely returns around 104
# inverse mode. Will only take values that are outside the std range
data | ~filtStd(std=0.1) | shape(0) # likely returns around 88
:param col: column to extract the value out of
:param std: how many standard deviations above and below to accept the values
:param N: how many times to do this operation. ``filtStd(std=1, N=2)`` is equivalent to ``filtStd(std=1) | filtStd(std=1)``""" # filtStd
self.col = col; self.std = std; self.N = N; self.inverted = False # filtStd
[docs] def __invert__(self): res = filtStd(self.col, self.std); res.inverted = not self.inverted; return res # filtStd
[docs] def __ror__(self, it): # filtStd
if self.N != 1: # filtStd
f = filtStd(self.col, self.std, 1); f.inverted = self.inverted # filtStd
for i in range(self.N): it = it | f # filtStd
return it # filtStd
col = self.col; inv = self.inverted; fStd = self.std # filter std # filtStd
isPd = hasPandas and isinstance(it, pd.core.arraylike.OpsMixin) # filtStd
if col is None: # filtStd
if isinstance(it, k1lib.settings.cli.arrayTypes) or isPd: # filtStd
mean = it.mean(); std = it.std() # filtStd
minV = mean - std*fStd; maxV = mean + std*fStd # filtStd
if not inv: return it[(it >= minV) * (it <= maxV)] # filtStd
else: return it[(it < minV) + (it > maxV)] # filtStd
it = list(it); mean = it | cli.toMean(); std = it | cli.toStd() # filtStd
minV = mean - std*fStd; maxV = mean + std*fStd # filtStd
if not inv: return [v for v in it if minV <= v <= maxV] # filtStd
else: return [v for v in it if v < minV or v > maxV] # filtStd
else: # filtStd
if isinstance(it, k1lib.settings.cli.arrayTypes) or isPd: # filtStd
c = it[list(it)[col]] if isPd else it[:,col] # filtStd
mean = c.mean(); std = c.std() # filtStd
minV = mean - std*fStd; maxV = mean + std*fStd # filtStd
x = c if isPd else (c | cli.toMean().all()) # filtStd
if not inv: return it[(minV <= x) * (x <= maxV)] # filtStd
else: return it[(minV > x) + (x > maxV)] # filtStd
row, it = it | cli.peek() # filtStd
if it == []: return [] # filtStd
try: # sliceable? This optimization cause the input might have sliceable rows already, so don't turn them into lists, to gain perf # filtStd
if len(row): it[0] # filtStd
it = list(it) # filtStd
except: it = [list(row) for row in it] # filtStd
x = [row[col] for row in it] # filtStd
mean = x | cli.toMean(); std = x | cli.toStd() # filtStd
minV = mean - std*fStd; maxV = mean + std*fStd # filtStd
if not inv: return [row for row in it if minV <= row[col] <= maxV] # filtStd
else: return [row for row in it if minV > row[col] or row[col] > maxV] # filtStd