# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
from typing import List, Iterator, Any, NewType, TypeVar, Generic
import k1lib.cli as cli; from numbers import Number
import k1lib, itertools, copy, xml, warnings, traceback, sys, random, ast, time; import numpy as np
from collections import deque
import xml.etree.ElementTree
try: import torch; hasTorch = True
except: hasTorch = False; torch = k1lib.dep.torch
try: import pandas as pd; pd.core; hasPandas = True
except: hasPandas = False
__all__ = ["BaseCli", "Table", "T", "fastF", "yieldT",
"serial", "oneToMany", "mtmS"]
settings = k1lib.Settings()
atomic = k1lib.Settings()
settings.add("atomic", atomic, "classes/types that are considered atomic and specified cli tools should never try to iterate over them")
settings.add("defaultDelim", "\t", "default delimiter used in-between columns when creating tables. Defaulted to tab character.")
settings.add("defaultIndent", " ", "default indent used for displaying nested structures")
settings.add("strict", False, "turning it on can help you debug stuff, but could also be a pain to work with")
settings.add("inf", float("inf"), "infinity definition for many clis. Here because you might want to temporarily not loop things infinitely")
k1lib.settings.add("cli", settings, "from k1lib.cli module")
yieldT = object()
def patchDefaultDelim(st:str): # patchDefaultDelim
"""
:param s:
- if not None, returns self
- else returns the default delimiter in :attr:`~k1lib.settings`""" # patchDefaultDelim
return settings.defaultDelim if st is None else st # patchDefaultDelim
def patchDefaultIndent(st:str): # patchDefaultIndent
"""
:param s:
- if not None, returns self
- else returns the default indent character in :attr:`~k1lib.settings`""" # patchDefaultIndent
return settings.defaultIndent if st is None else st # patchDefaultIndent
T = TypeVar("T") # patchDefaultIndent
"""Generic type variable""" # patchDefaultIndent
class _MetaType(type): # _MetaType
def __getitem__(self, generic): # _MetaType
d = {"__args__": generic, "_n": self._n, "__doc__": self.__doc__} # _MetaType
return _MetaType(self._n, (), d) # _MetaType
def __repr__(self): # _MetaType
def main(self): # _MetaType
def trueName(o): # _MetaType
if isinstance(o, _MetaType): return main(o) # _MetaType
try: return o.__name__ # _MetaType
except: return f"{o}" # _MetaType
if hasattr(self, "__args__"): # _MetaType
if isinstance(self.__args__, tuple): # _MetaType
return f"{self._n}[{', '.join([trueName(e) for e in self.__args__])}]" # _MetaType
else: return f"{self._n}[{trueName(self.__args__)}]" # _MetaType
return self._n # _MetaType
return main(self) # _MetaType
def newTypeHint(name, docs=""): # newTypeHint
"""Creates a new type hint that can be sliced and yet still looks fine
in sphinx. Crudely written by my poorly understood idea of Python's
metaclasses. Seriously, this shit is bonkers, read over it https://stackoverflow.com/questions/100003/what-are-metaclasses-in-python
Example::
Table = newTypeHint("Table", "some docs")
Table[int] # prints out as "Table[int]", and sphinx fell for it too
Table[Table[str], float] # prints out as "Table[Table[str], float]"
""" # newTypeHint
return _MetaType(name, (), {"_n": name, "__doc__": docs}) # newTypeHint
#Table = newTypeHint("Table", """Essentially just Iterator[List[T]]. This class is just here so that I can generate the docs with nicely formatted types like "Table[str]".""") # newTypeHint
#Table = NewType("Table", List) # newTypeHint
class Table(Generic[T]): # Table
"""Essentially just Iterator[List[T]]. This class is just here so that I can generate the docs with nicely formatted types like "Table[str]".""" # Table
pass # Table
Table._name = "Table" # Table
#Table.__module__ = "cli" # Table
class Row(list): # Row
"""Not really used currently. Just here for potential future feature""" # Row
pass # Row
_jsFAuto = k1lib.AutoIncrement(prefix=f"_jsF_{random.randint(100, 999)}_{round(time.time())}_") # Row
_jsDAuto = k1lib.AutoIncrement(prefix=f"_jsD_{random.randint(100, 999)}_{round(time.time())}_") # Row
class ArrayOptException(Exception): pass # ArrayOptException
[docs]class BaseCli: # BaseCli
"""A base class for all the cli stuff. You can definitely create new cli tools that
have the same feel without extending from this class, but advanced stream operations
(like ``+``, ``&``, ``.all()``, ``|``) won't work.
At the moment, you don't have to call super().__init__() and super().__ror__(),
as __init__'s only job right now is to solidify any :class:`~k1lib.cli.modifier.op`
passed to it, and __ror__ does nothing.""" # BaseCli
[docs] def __init__(self, fs:list=[], capture=False): # BaseCli
"""Not expected to be instantiated by the end user.
**fs param**
Expected to use it like this::
class A(BaseCli):
def __init__(self, f):
fs = [f]; super().__init__(fs); self.f = fs[0]
Where ``f`` is some (potentially exotic) function. This will replace f with a "normal"
function that's executable. See source code of :class:`~k1lib.cli.filt.filt` for an
example of why this is useful. Currently, it will:
- Replace with last recorded ``4 in op()``, if ``f`` is :data:`True`, because Python does
not allow returning complex objects from __contains__ method
- Solidifies every :class:`~k1lib.cli.modifier.op`.
:param capture: whether to capture all clis to the right of it and make it accessible under capturedClis and capturedSerial properties""" # BaseCli
if isinstance(fs, tuple): raise AttributeError("`fs` should not be a tuple. Use a list instead, so that new functions can be returned") # BaseCli
_k1_init_l = [] # BaseCli
for _k1_init_f in fs: cli.op.solidify(_k1_init_f); _k1_init_l.append(_k1_init_f) # this is supposed to turn the exotic function into a normal function and leave normal functions alone. Purposefully don't do heavy optimizations here, cause we might want to poke around and change its internal representation # BaseCli
fs.clear(); fs.extend(_k1_init_l); # BaseCli
self.capture = capture; self._capturedClis = []; self._capturedSerial = None # BaseCli
@property # BaseCli
def capturedClis(self): # BaseCli
if isinstance(self._capturedClis, list): # BaseCli
ans = [] # BaseCli
for e in self._capturedClis: ans.append(cli.op.solidify(e)) # BaseCli
self._capturedClis = tuple(ans) # BaseCli
return self._capturedClis # BaseCli
@property # BaseCli
def capturedSerial(self): # BaseCli
if not self.capture: return None # BaseCli
if self._capturedSerial is None: self._capturedSerial = serial(*self.capturedClis) # BaseCli
return self._capturedSerial # BaseCli
[docs] def hint(self, _hint:"cli.typehint.tBase"): # BaseCli
"""Specifies output type hint.""" # BaseCli
self._hint = _hint; return self # BaseCli
@property # BaseCli
def hasHint(self): return "_hint" in self.__dict__ and self._hint is not None # BaseCli
def _typehint(self, inp:"cli.typehint.tBase"=None) -> "cli.typehint.tBase": return cli.typehint.tAny() if "_hint" not in self.__dict__ else self._hint # BaseCli
[docs] def __and__(self, cli:"BaseCli") -> "oneToMany": # BaseCli
"""Duplicates input stream to multiple joined clis.
Example::
# returns [[5], [0, 1, 2, 3, 4]]
range(5) | (shape() & iden()) | deref()
Kinda like :class:`~k1lib.cli.modifier.apply`. There're just multiple ways of doing
this. This I think, is more intuitive, and :class:`~k1lib.cli.modifier.apply` is more
for lambdas and columns mode. Performances are pretty much identical.""" # BaseCli
if isinstance(self, oneToMany): return self._copy()._after(cli) # BaseCli
if isinstance(cli, oneToMany): return cli._copy()._before(self) # BaseCli
return oneToMany(self, cli) # BaseCli
[docs] def __add__(self, cli:"BaseCli") -> "mtmS": # BaseCli
"""Parallel pass multiple streams to multiple clis.
Example::
# returns [8, 15]
[2, 3] | ((op() * 4) + (op() * 5)) | deref()""" # BaseCli
if isinstance(self, mtmS): return self._copy()._after(cli) # BaseCli
if isinstance(cli, mtmS): return cli._copy()._before(self) # BaseCli
return mtmS(self, cli) # BaseCli
[docs] def all(self, n:int=1) -> "BaseCli": # BaseCli
"""Applies this cli to all incoming streams.
Example::
# returns (3,)
torch.randn(3, 4) | toMean().all() | shape()
# returns (3, 4)
torch.randn(3, 4, 5) | toMean().all(2) | shape()
:param n: how many times should I chain ``.all()``?""" # BaseCli
if n < 0: raise AttributeError(f"Does not make sense for `n` to be \"{n}\"") # BaseCli
s = self # BaseCli
for i in range(n): s = cli.apply(s) # BaseCli
return s # BaseCli
[docs] def __or__(self, cli_) -> "BaseCli": # cli is guaranteed (by typical usage, not law) that it's a BaseCli # BaseCli
"""Joins clis end-to-end.
Example::
c = apply(op() ** 2) | deref()
# returns [0, 1, 4, 9, 16]
range(5) | c""" # BaseCli
if not isinstance(self, cli.op) and hasattr(self, "capture") and self.capture: self._capturedClis.append(cli_); return self # BaseCli
if isinstance(self, serial): return self._after(cli_) # BaseCli
if isinstance(cli_, serial): return cli_._before(self) # BaseCli
return serial(self, cli_) # BaseCli
[docs] def __ror__(self, it): return NotImplemented # BaseCli
[docs] def f(self): # BaseCli
"""Creates a normal function :math:`f(x)` which is equivalent to
``x | self``.""" # BaseCli
return lambda it: self.__ror__(it) # BaseCli
[docs] def __lt__(self, it): # BaseCli
"""Backup pipe symbol `>`, purely for style, so that you can do something like
this::
range(4) > file("a.txt")""" # BaseCli
return self.__ror__(it) # BaseCli
[docs] def __call__(self, it, *args): # BaseCli
"""Another way to do ``it | cli``. If multiple arguments are fed, then the
argument list is passed to cli instead of just the first element. Example::
@applyS
def f(it):
return it
f(2) # returns 2
f(2, 3) # returns [2, 3]""" # BaseCli
if len(args) == 0: return self.__ror__(it) # BaseCli
else: return self.__ror__([it, *args]) # BaseCli
def __neg__(self): # BaseCli
"""Alias for __invert__, for clis that support inverting stuff.""" # BaseCli
return ~self # BaseCli
def _all_array_opt(self, it, level:int): # BaseCli
"""Array types optimization for ``operator.all(level)``.
Essentially, a lot of times, I'm trying to do ``array | op()[3].all()``,
or ``array | transpose().all()``. But without this optimization, that ``.all()``
function kinda loops through each element and operates on them in vanilla Python,
which is super slow. So, this is a mechanism to speed it up. Here's how it works::
# you wrote this
array | operator.all() | deref()
# apply() detects that you're trying to operate on an array type. It then figures
# out how many nested apply() levels are there. In this case it's 1, so apply() returns this instead
operator._all_array_opt(array, 1)
# if that throws an error or returns NotImplemented, then it'll just loop through the array normally
# if you wrote this instead
array | operator.all(3) | deref()
# or this
array | apply(apply(operator.all())) | deref()
# apply() will try to execute this instead
operator._all_array_opt(array, 3)
Also, if the operator is a complex one, made of an entire pipeline, then ``serial`` can break
them apart and do this kind of optimization on each simple operator like this::
operator = op()[3] | transpose()
array | operator.all() | deref()
# that gets transformed into this
array | op()[3].all() | transpose().all() | deref()
# then, array() will be called 2 times
arr2 = op()[3]._all_array_opt(array, 1)
transpose()._all_array_opt(arr2, 1)
It also works on something more complicated and nested like this::
# returns
np.random.randn(3,4,5,6,7,8) | apply(transpose().all(3) | item()) | shape()
This breakdown also happens with op() (anticipated feature, not implemented yet)::
array | op()[3][:4].all() | deref()
# this will be broken down into
array | op()[3].all() | op()[:4].all() | deref()
# each piece will now have a chance to optimize the array structure independently,
# so even if op()[:4] can't be done, op()[3] still have a chance to do the C-optimized version
Why don't I build a more standardized structure for these optimization passes? Well
I did, along the lines of LLVM. But, the whole optimization process kinda takes a long
time and I'm not sure if it's truly flexible for the kinds of workloads that I'm thinking
about. So, I'll just do this quick dumb optimization hack to get it over with, and when
I can think more clearly about this, I might move this mechanism back to LLVM.""" # BaseCli
return NotImplemented # BaseCli
def _jsF(self, meta): # BaseCli
"""JS transpiler default function. See "JS transpiler" section in the docs""" # BaseCli
return NotImplemented # BaseCli
def _pyF(self, meta): # BaseCli
"""Cli to Python transpiler default function.""" # BaseCli
return NotImplemented # BaseCli
def _cppF(self, meta): # BaseCli
"""C++ transpiler default function""" # BaseCli
return NotImplemented # BaseCli
def _javaF(self, meta): # BaseCli
"""Java transpiler default function""" # BaseCli
return NotImplemented # BaseCli
def _sqlF(self, meta): # BaseCli
"""SQL transpiler default function""" # BaseCli
return NotImplemented # BaseCli
def _k1_init_frames(): # _k1_init_frames
_k1_init_frames_count = 0 # _k1_init_frames
try: # _k1_init_frames
while True: # _k1_init_frames
yield sys._getframe(_k1_init_frames_count) # `sys._getframe()` trick stolen from pd.DataFrame.query # _k1_init_frames
_k1_init_frames_count += 1 # _k1_init_frames
except: pass # _k1_init_frames
def _k1_global_frame(): # _k1_global_frame
try: # _k1_global_frame
_k1_init_frames_ans = {} # _k1_global_frame
for _k1_init_frames_frame in reversed(list(_k1_init_frames())): # _k1_global_frame
_k1_init_frames_ans = {**_k1_init_frames_ans, **_k1_init_frames_frame.f_locals} # _k1_global_frame
return _k1_init_frames_ans # _k1_global_frame
except: return {} # _k1_global_frame
[docs]def fastF(c, x=None): # fastF
"""Tries to figure out what's going on, is it a normal function, or an applyS,
or a BaseCli, etc., and return a really fast function for execution. Example::
# both returns 16, fastF returns "lambda x: x**2", so it's really fast
fastF(op()**2)(4)
fastF(applyS(lambda x: x**2))(4)
At the moment, parameter ``x`` does nothing, but potentially in the future, you can
pass in an example input to the cli, so that this returns an optimized, C compiled
version.
:param x: sample data for the cli""" # fastF
if isinstance(c, str): # fastF
_k1_expr = ast.parse(c).body[0].value # fastF
if isinstance(_k1_expr, ast.Lambda): return fastF(eval(c, _k1_global_frame())) # fastF
else: return fastF(eval(f"lambda x: {c}", _k1_global_frame())) # fastF
if isinstance(c, cli.op): return c.ab_fastF() # fastF
if isinstance(c, cli.applyS): # fastF
f = fastF(c.f) # fastF
if len(c.args) == 0 and len(c.kwargs) == 0: return f # fastF
else: return lambda x, *args, **kwargs: f(x, *c.args, **c.kwargs) # fastF
if isinstance(c, BaseCli): return c.__ror__ # fastF
return c # fastF
def dfGuard(x): # dfGuard
"""If input is a pandas dataframe, then return a regular table instead, to interopt well with
clis. Of course, this is not very performant because this will likely return an object array, which
can't use C-accerated functions. If the clis have a faster way of doing it then it shouldn't use this""" # dfGuard
return x.to_numpy() if hasPandas and isinstance(x, pd.core.frame.DataFrame) else x # dfGuard
def preprocessPd(it, col:"int|None", f, farr=None): # preprocessPd
"""Given either a series or a dataframe, a function and a column, return f(it[:,col]) 1d numpy array.
:param f: eltwise operation. Can be vectorized
:param farr: explicitly vectorized operation. Optional""" # preprocessPd
ndim = 1 if isinstance(it, pd.core.series.Series) else len(it | cli.shape()) # preprocessPd
if ndim == 1: # preprocessPd
if col is not None: raise ValueError("Can't apply to Series as .col is not None. Use a DataFrame or set .col to None") # preprocessPd
try: return (farr or f)(it) # preprocessPd
except: return np.array([f(e) for e in it]) # preprocessPd
if ndim >= 2: # preprocessPd
if col is None: raise ValueError("Can't apply to DataFrame as .col is not None. Use a Series or set .col to some value") # preprocessPd
s = it[list(it)[col]] # preprocessPd
try: return (farr or f)(s) # preprocessPd
except: return np.array([f(e) for e in s]) # preprocessPd
def checkRor(c): # checkRor
if isinstance(c, BaseCli): return c # checkRor
if hasattr(c, "__ror__"): return cli.aS(c.__ror__) # checkRor
if callable(c): return cli.aS(c) # checkRor
raise Exception(f"Trying to add an operator to the pipeline, but the given object is not derived from BaseCli nor does it define a __ror__ method") # checkRor
class serialRepeat(BaseCli): # serialRepeat
def __init__(self, f, n:int): # serialRepeat
self.f = f; self.n = n; self._fC = fastF(f) # serialRepeat
def __ror__(self, it): # serialRepeat
f = self._fC # serialRepeat
for i in range(self.n): it = f(it) # serialRepeat
return it # serialRepeat
def _jsF(self, meta): # serialRepeat
f = self.f; fIdx = _jsFAuto(); dataIdx = _jsDAuto(); res = k1lib.kast.asyncGuard(k1lib.kast.prepareFunc3(f, meta)) # serialRepeat
if res is NotImplemented: raise Exception(f"{cli.__class__} can't be transpiled into js. Either it doesn't make sense, or it hasn't been built yet") # serialRepeat
header, fn, _async = res # serialRepeat
return f"""\
{header}\n{fIdx} = {'async ' if _async else ''}({dataIdx}) => {{
for (let i = 0; i < {self.n}; i++) {{
{dataIdx} = {'await ' if _async else ''}{fn}({dataIdx});
}}
return {dataIdx};
}}""", fIdx # serialRepeat
[docs]class serial(BaseCli): # serial
[docs] def __init__(self, *clis:List[BaseCli]): # serial
"""Merges clis into 1, feeding end to end. Used in chaining clis
together without a prime iterator. Meaning, without this, stuff like this
fails to run::
[1, 2] | a() | b() # runs
c = a() | b(); [1, 2] | c # doesn't run if this class doesn't exist""" # serial
fs = [checkRor(c) for c in clis]; super().__init__(fs); self.clis = fs; self._runOpt() # serial
def _runOpt(self): # serial
self._hasTrace = any(isinstance(c, cli.trace) for c in self.clis) # serial
self._cliCs = [fastF(c) for c in self.clis]; return self # serial
def _typehint(self, inp=None): # serial
for c in self.clis: inp = c._typehint(inp) or cli.typehint.tAny() # serial
return inp # serial
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # serial
if self._hasTrace: # slower, but tracable # serial
for cli in self._clis_after_capture_analysis: it = it | cli # serial
else: # faster, but not tracable # serial
for cli in self._cliCs: it = cli(it) # serial
return it # serial
def _before(self, c): return serial(checkRor(c), *self.clis) # serial
def _after (self, c): return serial(*self.clis, checkRor(c)) # serial
def _jsF(self, meta): # serial
headers = []; fns = []; asyncs = []; fIdx = _jsFAuto(); dataIdx = _jsDAuto() # serial
for cli in self.clis: # serial
res = k1lib.kast.asyncGuard(cli._jsF(meta)) # serial
if res is NotImplemented: raise Exception(f"{cli.__class__} can't be transpiled into js. Either it doesn't make sense, or it hasn't been built yet") # serial
header, fn, _async = res # serial
headers.append(header); fns.append(fn); asyncs.append(_async) # serial
body = dataIdx # serial
for fn, _async in zip(fns, asyncs): body = f"{'await ' if _async else ''}{fn}({body})" # serial
return "\n".join(headers) + f"""\n{fIdx} = {'async ' if any(asyncs) else ''}({dataIdx}) => {{ return {body}; }};""", fIdx # serial
[docs] @staticmethod # serial
def repeat(f, n:int): # serial
"""Executes this function over and over again for n times.
Example::
# returns 6561, or ((3^2)^2)^2
3 | serial.repeat(op()**2, 3)
Of course, you can also do something like this::
3 | serial(*[lambda x: x**2]*3)
And it would achieve the same result, but using this method, you can
vary n if you were to transpile it to JS.
:param f: function to execute
:param n: how many times to execute this function serially""" # serial
return serialRepeat(f, n) # serial
atomic.add("baseAnd", (Number, np.number, str, dict, bool, bytes, list, tuple, *([torch.Tensor] if hasTorch else []), np.ndarray, xml.etree.ElementTree.Element), "used by BaseCli.__and__") # serial
def addAtomic(klass): # addAtomic
atomic.baseAnd = (*atomic.baseAnd, klass) # addAtomic
atomic.deref = (*atomic.deref, klass) # addAtomic
def _iterable(it): # _iterable
try: iter(it); return True # _iterable
except: return False # _iterable
[docs]class oneToMany(BaseCli): # oneToMany
[docs] def __init__(self, *clis:List[BaseCli]): # oneToMany
"""Duplicates 1 stream into multiple streams, each for a cli in the
list. Used in the "a & b" joining operator. See also: :meth:`BaseCli.__and__`""" # oneToMany
fs = [checkRor(c) for c in clis]; super().__init__(fs); self.clis = fs; self._cache() # oneToMany
def _typehint(self, inp): # oneToMany
ts = [] # oneToMany
for f in self.clis: # oneToMany
try: ts.append(f._typehint(inp)) # oneToMany
except: ts.append(cli.typehint.tAny()) # oneToMany
return cli.typehint.tCollection(*ts).reduce() # oneToMany
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Iterator[Any]]: # oneToMany
if isinstance(it, atomic.baseAnd) or isinstance(it, k1lib.cli.splitSeek) or not _iterable(it): # oneToMany
for cli in self._cliCs: yield cli(it) # oneToMany
else: # oneToMany
its = itertools.tee(it, len(self.clis)) # oneToMany
for cli, it in zip(self._cliCs, its): yield cli(it) # oneToMany
def _cache(self): self._cliCs = [fastF(c) for c in self.clis]; return self # oneToMany
def _before(self, c): self.clis = [checkRor(c)] + self.clis; return self._cache() # oneToMany
def _after(self, c): self.clis = self.clis + [checkRor(c)]; return self._cache() # oneToMany
def _copy(self): return oneToMany(*self.clis) # oneToMany
def _jsF(self, meta): # oneToMany
headers = []; fns = []; asyncs = []; fIdx = _jsFAuto(); dataIdx = _jsDAuto() # oneToMany
for cli in self.clis: # oneToMany
res = k1lib.kast.asyncGuard(cli._jsF(meta)) # oneToMany
if res is NotImplemented: raise Exception(f"{cli.__class__} can't be transpiled into js. Either it doesn't make sense, or it hasn't been built yet") # oneToMany
header, fn, _async = res # oneToMany
headers.append(header); fns.append(fn); asyncs.append(_async) # oneToMany
body = "[" + ", ".join([f"{'await ' if _async else ''}{fn}({dataIdx})" for fn, _async in zip(fns, asyncs)]) + "]" # oneToMany
return "\n".join(headers) + f"""\n{fIdx} = {'async ' if any(asyncs) else ''}({dataIdx}) => {body};""", fIdx # oneToMany
[docs]class mtmS(BaseCli): # mtmS
[docs] def __init__(self, *clis:List[BaseCli]): # mtmS
"""Applies multiple streams to multiple clis independently. Used in
the "a + b" joining operator. See also: :meth:`BaseCli.__add__`.
Weird name is actually a shorthand for "many to many specific".""" # mtmS
fs = [checkRor(c) for c in clis]; super().__init__(fs=fs); self.clis = fs; self._cache() # mtmS
def _inpTypeHintExpand(self, t): # mtmS
n = len(self.clis); # mtmS
if isinstance(t, (cli.typehint.tCollection, *cli.typehint.tListIterSet, cli.typehint.tArrayTypes)): return t.expand(n) # mtmS
else: return [cli.typehint.tAny()]*n # mtmS
def _typehint(self, t): # mtmS
n = len(self.clis); outTs = [] # mtmS
for c, t in zip(self.clis, self._inpTypeHintExpand(t)): # mtmS
try: outTs.append(c._typehint(t)) # mtmS
except: outTs.append(cli.typehint.tAny()) # mtmS
return cli.typehint.tCollection(*outTs).reduce() # mtmS
def _cache(self): self._cliCs = [fastF(c) for c in self.clis]; return self # mtmS
def _before(self, c): self.clis = [checkRor(c)] + self.clis; return self._cache() # mtmS
def _after (self, c): self.clis = self.clis + [checkRor(c)]; return self._cache() # mtmS
[docs] def __ror__(self, its:Iterator[Any]) -> Iterator[Any]: # mtmS
for cli, it in zip(self._cliCs, its): yield cli(it) # mtmS
[docs] @staticmethod # mtmS
def f(f, i:int, n:int=100): # mtmS
"""Convenience method, so
that this::
mtmS(iden(), op()**2, iden(), iden(), iden())
# also the same as this btw:
(iden() + op()**2 + iden() + iden() + iden())
is the same as this::
mtmS.f(op()**2, 1, 5)
Example::
# returns [5, 36, 7, 8, 9]
range(5, 10) | mtmS.f(op()**2, 1, 5) | deref()
:param i: where should I put the function?
:param n: how many clis in total? Defaulted to 100""" # mtmS
return mtmS(*([cli.iden()]*i + [f] + [cli.iden()]*(n-i-1))) # mtmS
def _copy(self): return mtmS(*self.clis) # mtmS
def _jsF(self, meta): # mtmS
headers = []; fns = []; asyncs = []; fIdx = _jsFAuto(); dataIdx = _jsDAuto() # mtmS
for cli in self.clis: # mtmS
res = k1lib.kast.asyncGuard(cli._jsF(meta)) # mtmS
if res is NotImplemented: raise Exception(f"{cli.__class__} can't be transpiled into js. Either it doesn't make sense, or it hasn't been built yet") # mtmS
header, fn, _async = res # mtmS
headers.append(header); fns.append(fn); asyncs.append(_async) # mtmS
body = "[" + ", ".join([f"{'await ' if _async else ''}{fn}({dataIdx}[{i}])" for i, (fn, _async) in enumerate(zip(fns, asyncs))]) + "]" # mtmS
return "\n".join(headers) + f"""\n{fIdx} = {'async ' if any(asyncs) else ''}({dataIdx}) => {body};""", fIdx # mtmS
[docs]def patchNumpy(): # patchNumpy
"""Patches numpy arrays and data types, so that piping like
this work::
a = np.random.randn(3)
a | shape() # returns (3,)""" # patchNumpy
try: # patchNumpy
if np._k1_patched: return # patchNumpy
except: pass # patchNumpy
try: # patchNumpy
import forbiddenfruit, inspect; #forbiddenfruit.reverse(np.ndarray, "__or__") # old version # patchNumpy
oldOr = np.ndarray.__or__ # patchNumpy
def _newNpOr(self, v): # patchNumpy
if isinstance(v, BaseCli): return NotImplemented # patchNumpy
try: return oldOr(self, v) # patchNumpy
except: warnings.warn(traceback.format_exc()) # patchNumpy
forbiddenfruit.curse(np.ndarray, "__or__", _newNpOr) # patchNumpy
a = [getattr(np, dk) for dk in np.__dict__.keys()] # patching all numpy's numeric types # patchNumpy
for _type in [x for x in a if inspect.isclass(x) and issubclass(x, np.number) and not issubclass(x, np.integer)]: # patchNumpy
_oldOr = _type.__or__ # patchNumpy
def _typeNewOr(self, v): # patchNumpy
if isinstance(v, BaseCli): return NotImplemented # patchNumpy
try: return _oldOr(self, v) # patchNumpy
except: warnings.warn(traceback.format_exc()) # patchNumpy
forbiddenfruit.curse(_type, "__or__", _typeNewOr) # patchNumpy
np._k1_patched = True # patchNumpy
except Exception as e: warnings.warn(f"Tried to patch __or__ operator of built-in type `np.ndarray` but can't because: {e}") # patchNumpy
dict_keys = type({"a": 3}.keys()); oldDKOr = dict_keys.__or__ # patchNumpy
dict_items = type({"a": 3}.items()); oldDIOr = dict_items.__or__ # patchNumpy
oldSetOr = set.__or__ # patchNumpy
[docs]def patchDict(): # patchDict
"""Patches dictionaries's items and keys, so that piping
works::
d = {"a": 3, "b": 4}
d.keys() | deref() # returns ["a", "b"]
d.items() | deref() # returns [["a", 3], ["b", 4]]""" # patchDict
try: # patchDict
if np._k1_dict_patched: return # patchDict
except: pass # patchDict
try: # patchDict
import forbiddenfruit, traceback # patchDict
def _newDOr(self, v): # patchDict
"""Why is this so weird? For some reason, if you patch dict_keys, you will
also patch dict_items. So, if you were to have 2 functions, one for each,
then they will override each other. The way forward is to have 1 single
function detect whether it's dict_keys or dict_items, and call the correct
original function. So why are there 2 curses? Well cause I'm lazy to check
for this behavior in multiple python versions, so just have 2 to make sure.""" # patchDict
if isinstance(v, BaseCli): return NotImplemented # patchDict
try: # patchDict
# print(self, type(self), v, type(v)) # patchDict
if isinstance(self, dict_keys): return oldDKOr(self, v) # patchDict
elif isinstance(self, dict): # patchDict
if isinstance(v, dict_keys): return oldSetOr(set(self.keys()), set(v)) # patchDict
return oldDIOr(self, v) # patchDict
elif isinstance(self, set): # patchDict
if isinstance(v, dict_keys): return oldSetOr(self, set(v)) # patchDict
return oldSetOr(self, v) # patchDict
except: # patchDict
print(self, type(self), v, type(v)) # patchDict
warnings.warn(traceback.format_exc()) # patchDict
return NotImplemented # patchDict
forbiddenfruit.curse(dict_keys, "__or__", _newDOr) # patchDict
forbiddenfruit.curse(dict_items, "__or__", _newDOr) # patchDict
np._k1_dict_patched = True # patchDict
except Exception as e: warnings.warn(f"Tried to patch __or__ operator of built-in type `dict_keys` and `dict_items` but can't because: {e}") # patchDict