Source code for k1lib.cli.init

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
from typing import List, Iterator, Any, NewType, TypeVar, Generic
import k1lib.cli as cli; from numbers import Number
import k1lib, itertools, copy, xml, warnings, traceback, sys, random, ast, time; import numpy as np
from collections import deque
import xml.etree.ElementTree
try: import torch; hasTorch = True
except: hasTorch = False; torch = k1lib.dep.torch
try: import pandas as pd; pd.core; hasPandas = True
except: hasPandas = False
__all__ = ["BaseCli", "Table", "T", "fastF", "yieldT",
           "serial", "oneToMany", "mtmS"]
settings = k1lib.Settings()
atomic = k1lib.Settings()
settings.add("atomic", atomic, "classes/types that are considered atomic and specified cli tools should never try to iterate over them")
settings.add("defaultDelim", "\t", "default delimiter used in-between columns when creating tables. Defaulted to tab character.")
settings.add("defaultIndent", "  ", "default indent used for displaying nested structures")
settings.add("strict", False, "turning it on can help you debug stuff, but could also be a pain to work with")
settings.add("inf", float("inf"), "infinity definition for many clis. Here because you might want to temporarily not loop things infinitely")
k1lib.settings.add("cli", settings, "from k1lib.cli module")
yieldT = object()
def patchDefaultDelim(st:str):                                                   # patchDefaultDelim
    """
:param s:
    - if not None, returns self
    - else returns the default delimiter in :attr:`~k1lib.settings`"""           # patchDefaultDelim
    return settings.defaultDelim if st is None else st                           # patchDefaultDelim
def patchDefaultIndent(st:str):                                                  # patchDefaultIndent
    """
:param s:
    - if not None, returns self
    - else returns the default indent character in :attr:`~k1lib.settings`"""    # patchDefaultIndent
    return settings.defaultIndent if st is None else st                          # patchDefaultIndent
T = TypeVar("T")                                                                 # patchDefaultIndent
"""Generic type variable"""                                                      # patchDefaultIndent
class _MetaType(type):                                                           # _MetaType
    def __getitem__(self, generic):                                              # _MetaType
        d = {"__args__": generic, "_n": self._n, "__doc__": self.__doc__}        # _MetaType
        return _MetaType(self._n, (), d)                                         # _MetaType
    def __repr__(self):                                                          # _MetaType
        def main(self):                                                          # _MetaType
            def trueName(o):                                                     # _MetaType
                if isinstance(o, _MetaType): return main(o)                      # _MetaType
                try: return o.__name__                                           # _MetaType
                except: return f"{o}"                                            # _MetaType
            if hasattr(self, "__args__"):                                        # _MetaType
                if isinstance(self.__args__, tuple):                             # _MetaType
                    return f"{self._n}[{', '.join([trueName(e) for e in self.__args__])}]" # _MetaType
                else: return f"{self._n}[{trueName(self.__args__)}]"             # _MetaType
            return self._n                                                       # _MetaType
        return main(self)                                                        # _MetaType
def newTypeHint(name, docs=""):                                                  # newTypeHint
    """Creates a new type hint that can be sliced and yet still looks fine
in sphinx. Crudely written by my poorly understood idea of Python's
metaclasses. Seriously, this shit is bonkers, read over it https://stackoverflow.com/questions/100003/what-are-metaclasses-in-python

Example::

    Table = newTypeHint("Table", "some docs")
    Table[int] # prints out as "Table[int]", and sphinx fell for it too
    Table[Table[str], float] # prints out as "Table[Table[str], float]"
"""                                                                              # newTypeHint
    return _MetaType(name, (), {"_n": name, "__doc__": docs})                    # newTypeHint
#Table = newTypeHint("Table", """Essentially just Iterator[List[T]]. This class is just here so that I can generate the docs with nicely formatted types like "Table[str]".""") # newTypeHint
#Table = NewType("Table", List)                                                  # newTypeHint
class Table(Generic[T]):                                                         # Table
    """Essentially just Iterator[List[T]]. This class is just here so that I can generate the docs with nicely formatted types like "Table[str]".""" # Table
    pass                                                                         # Table
Table._name = "Table"                                                            # Table
#Table.__module__ = "cli"                                                        # Table
class Row(list):                                                                 # Row
    """Not really used currently. Just here for potential future feature"""      # Row
    pass                                                                         # Row
_jsFAuto = k1lib.AutoIncrement(prefix=f"_jsF_{random.randint(100, 999)}_{round(time.time())}_") # Row
_jsDAuto = k1lib.AutoIncrement(prefix=f"_jsD_{random.randint(100, 999)}_{round(time.time())}_") # Row
class ArrayOptException(Exception): pass                                         # ArrayOptException
[docs]class BaseCli: # BaseCli """A base class for all the cli stuff. You can definitely create new cli tools that have the same feel without extending from this class, but advanced stream operations (like ``+``, ``&``, ``.all()``, ``|``) won't work. At the moment, you don't have to call super().__init__() and super().__ror__(), as __init__'s only job right now is to solidify any :class:`~k1lib.cli.modifier.op` passed to it, and __ror__ does nothing.""" # BaseCli
[docs] def __init__(self, fs:list=[], capture=False): # BaseCli """Not expected to be instantiated by the end user. **fs param** Expected to use it like this:: class A(BaseCli): def __init__(self, f): fs = [f]; super().__init__(fs); self.f = fs[0] Where ``f`` is some (potentially exotic) function. This will replace f with a "normal" function that's executable. See source code of :class:`~k1lib.cli.filt.filt` for an example of why this is useful. Currently, it will: - Replace with last recorded ``4 in op()``, if ``f`` is :data:`True`, because Python does not allow returning complex objects from __contains__ method - Solidifies every :class:`~k1lib.cli.modifier.op`. :param capture: whether to capture all clis to the right of it and make it accessible under capturedClis and capturedSerial properties""" # BaseCli if isinstance(fs, tuple): raise AttributeError("`fs` should not be a tuple. Use a list instead, so that new functions can be returned") # BaseCli _k1_init_l = [] # BaseCli for _k1_init_f in fs: cli.op.solidify(_k1_init_f); _k1_init_l.append(_k1_init_f) # this is supposed to turn the exotic function into a normal function and leave normal functions alone. Purposefully don't do heavy optimizations here, cause we might want to poke around and change its internal representation # BaseCli fs.clear(); fs.extend(_k1_init_l); # BaseCli self.capture = capture; self._capturedClis = []; self._capturedSerial = None # BaseCli
@property # BaseCli def capturedClis(self): # BaseCli if isinstance(self._capturedClis, list): # BaseCli ans = [] # BaseCli for e in self._capturedClis: ans.append(cli.op.solidify(e)) # BaseCli self._capturedClis = tuple(ans) # BaseCli return self._capturedClis # BaseCli @property # BaseCli def capturedSerial(self): # BaseCli if not self.capture: return None # BaseCli if self._capturedSerial is None: self._capturedSerial = serial(*self.capturedClis) # BaseCli return self._capturedSerial # BaseCli
[docs] def hint(self, _hint:"cli.typehint.tBase"): # BaseCli """Specifies output type hint.""" # BaseCli self._hint = _hint; return self # BaseCli
@property # BaseCli def hasHint(self): return "_hint" in self.__dict__ and self._hint is not None # BaseCli def _typehint(self, inp:"cli.typehint.tBase"=None) -> "cli.typehint.tBase": return cli.typehint.tAny() if "_hint" not in self.__dict__ else self._hint # BaseCli
[docs] def __and__(self, cli:"BaseCli") -> "oneToMany": # BaseCli """Duplicates input stream to multiple joined clis. Example:: # returns [[5], [0, 1, 2, 3, 4]] range(5) | (shape() & iden()) | deref() Kinda like :class:`~k1lib.cli.modifier.apply`. There're just multiple ways of doing this. This I think, is more intuitive, and :class:`~k1lib.cli.modifier.apply` is more for lambdas and columns mode. Performances are pretty much identical.""" # BaseCli if isinstance(self, oneToMany): return self._copy()._after(cli) # BaseCli if isinstance(cli, oneToMany): return cli._copy()._before(self) # BaseCli return oneToMany(self, cli) # BaseCli
[docs] def __add__(self, cli:"BaseCli") -> "mtmS": # BaseCli """Parallel pass multiple streams to multiple clis. Example:: # returns [8, 15] [2, 3] | ((op() * 4) + (op() * 5)) | deref()""" # BaseCli if isinstance(self, mtmS): return self._copy()._after(cli) # BaseCli if isinstance(cli, mtmS): return cli._copy()._before(self) # BaseCli return mtmS(self, cli) # BaseCli
[docs] def all(self, n:int=1) -> "BaseCli": # BaseCli """Applies this cli to all incoming streams. Example:: # returns (3,) torch.randn(3, 4) | toMean().all() | shape() # returns (3, 4) torch.randn(3, 4, 5) | toMean().all(2) | shape() :param n: how many times should I chain ``.all()``?""" # BaseCli if n < 0: raise AttributeError(f"Does not make sense for `n` to be \"{n}\"") # BaseCli s = self # BaseCli for i in range(n): s = cli.apply(s) # BaseCli return s # BaseCli
[docs] def __or__(self, cli_) -> "BaseCli": # cli is guaranteed (by typical usage, not law) that it's a BaseCli # BaseCli """Joins clis end-to-end. Example:: c = apply(op() ** 2) | deref() # returns [0, 1, 4, 9, 16] range(5) | c""" # BaseCli if not isinstance(self, cli.op) and hasattr(self, "capture") and self.capture: self._capturedClis.append(cli_); return self # BaseCli if isinstance(self, serial): return self._after(cli_) # BaseCli if isinstance(cli_, serial): return cli_._before(self) # BaseCli return serial(self, cli_) # BaseCli
[docs] def __ror__(self, it): return NotImplemented # BaseCli
[docs] def f(self): # BaseCli """Creates a normal function :math:`f(x)` which is equivalent to ``x | self``.""" # BaseCli return lambda it: self.__ror__(it) # BaseCli
[docs] def __lt__(self, it): # BaseCli """Backup pipe symbol `>`, purely for style, so that you can do something like this:: range(4) > file("a.txt")""" # BaseCli return self.__ror__(it) # BaseCli
[docs] def __call__(self, it, *args): # BaseCli """Another way to do ``it | cli``. If multiple arguments are fed, then the argument list is passed to cli instead of just the first element. Example:: @applyS def f(it): return it f(2) # returns 2 f(2, 3) # returns [2, 3]""" # BaseCli if len(args) == 0: return self.__ror__(it) # BaseCli else: return self.__ror__([it, *args]) # BaseCli
def __neg__(self): # BaseCli """Alias for __invert__, for clis that support inverting stuff.""" # BaseCli return ~self # BaseCli def _all_array_opt(self, it, level:int): # BaseCli """Array types optimization for ``operator.all(level)``. Essentially, a lot of times, I'm trying to do ``array | op()[3].all()``, or ``array | transpose().all()``. But without this optimization, that ``.all()`` function kinda loops through each element and operates on them in vanilla Python, which is super slow. So, this is a mechanism to speed it up. Here's how it works:: # you wrote this array | operator.all() | deref() # apply() detects that you're trying to operate on an array type. It then figures # out how many nested apply() levels are there. In this case it's 1, so apply() returns this instead operator._all_array_opt(array, 1) # if that throws an error or returns NotImplemented, then it'll just loop through the array normally # if you wrote this instead array | operator.all(3) | deref() # or this array | apply(apply(operator.all())) | deref() # apply() will try to execute this instead operator._all_array_opt(array, 3) Also, if the operator is a complex one, made of an entire pipeline, then ``serial`` can break them apart and do this kind of optimization on each simple operator like this:: operator = op()[3] | transpose() array | operator.all() | deref() # that gets transformed into this array | op()[3].all() | transpose().all() | deref() # then, array() will be called 2 times arr2 = op()[3]._all_array_opt(array, 1) transpose()._all_array_opt(arr2, 1) It also works on something more complicated and nested like this:: # returns np.random.randn(3,4,5,6,7,8) | apply(transpose().all(3) | item()) | shape() This breakdown also happens with op() (anticipated feature, not implemented yet):: array | op()[3][:4].all() | deref() # this will be broken down into array | op()[3].all() | op()[:4].all() | deref() # each piece will now have a chance to optimize the array structure independently, # so even if op()[:4] can't be done, op()[3] still have a chance to do the C-optimized version Why don't I build a more standardized structure for these optimization passes? Well I did, along the lines of LLVM. But, the whole optimization process kinda takes a long time and I'm not sure if it's truly flexible for the kinds of workloads that I'm thinking about. So, I'll just do this quick dumb optimization hack to get it over with, and when I can think more clearly about this, I might move this mechanism back to LLVM.""" # BaseCli return NotImplemented # BaseCli def _jsF(self, meta): # BaseCli """JS transpiler default function. See "JS transpiler" section in the docs""" # BaseCli return NotImplemented # BaseCli def _pyF(self, meta): # BaseCli """Cli to Python transpiler default function.""" # BaseCli return NotImplemented # BaseCli def _cppF(self, meta): # BaseCli """C++ transpiler default function""" # BaseCli return NotImplemented # BaseCli def _javaF(self, meta): # BaseCli """Java transpiler default function""" # BaseCli return NotImplemented # BaseCli def _sqlF(self, meta): # BaseCli """SQL transpiler default function""" # BaseCli return NotImplemented # BaseCli
def _k1_init_frames(): # _k1_init_frames _k1_init_frames_count = 0 # _k1_init_frames try: # _k1_init_frames while True: # _k1_init_frames yield sys._getframe(_k1_init_frames_count) # `sys._getframe()` trick stolen from pd.DataFrame.query # _k1_init_frames _k1_init_frames_count += 1 # _k1_init_frames except: pass # _k1_init_frames def _k1_global_frame(): # _k1_global_frame try: # _k1_global_frame _k1_init_frames_ans = {} # _k1_global_frame for _k1_init_frames_frame in reversed(list(_k1_init_frames())): # _k1_global_frame _k1_init_frames_ans = {**_k1_init_frames_ans, **_k1_init_frames_frame.f_locals} # _k1_global_frame return _k1_init_frames_ans # _k1_global_frame except: return {} # _k1_global_frame
[docs]def fastF(c, x=None): # fastF """Tries to figure out what's going on, is it a normal function, or an applyS, or a BaseCli, etc., and return a really fast function for execution. Example:: # both returns 16, fastF returns "lambda x: x**2", so it's really fast fastF(op()**2)(4) fastF(applyS(lambda x: x**2))(4) At the moment, parameter ``x`` does nothing, but potentially in the future, you can pass in an example input to the cli, so that this returns an optimized, C compiled version. :param x: sample data for the cli""" # fastF if isinstance(c, str): # fastF _k1_expr = ast.parse(c).body[0].value # fastF if isinstance(_k1_expr, ast.Lambda): return fastF(eval(c, _k1_global_frame())) # fastF else: return fastF(eval(f"lambda x: {c}", _k1_global_frame())) # fastF if isinstance(c, cli.op): return c.ab_fastF() # fastF if isinstance(c, cli.applyS): # fastF f = fastF(c.f) # fastF if len(c.args) == 0 and len(c.kwargs) == 0: return f # fastF else: return lambda x, *args, **kwargs: f(x, *c.args, **c.kwargs) # fastF if isinstance(c, BaseCli): return c.__ror__ # fastF return c # fastF
def dfGuard(x): # dfGuard """If input is a pandas dataframe, then return a regular table instead, to interopt well with clis. Of course, this is not very performant because this will likely return an object array, which can't use C-accerated functions. If the clis have a faster way of doing it then it shouldn't use this""" # dfGuard return x.to_numpy() if hasPandas and isinstance(x, pd.core.frame.DataFrame) else x # dfGuard def preprocessPd(it, col:"int|None", f, farr=None): # preprocessPd """Given either a series or a dataframe, a function and a column, return f(it[:,col]) 1d numpy array. :param f: eltwise operation. Can be vectorized :param farr: explicitly vectorized operation. Optional""" # preprocessPd ndim = 1 if isinstance(it, pd.core.series.Series) else len(it | cli.shape()) # preprocessPd if ndim == 1: # preprocessPd if col is not None: raise ValueError("Can't apply to Series as .col is not None. Use a DataFrame or set .col to None") # preprocessPd try: return (farr or f)(it) # preprocessPd except: return np.array([f(e) for e in it]) # preprocessPd if ndim >= 2: # preprocessPd if col is None: raise ValueError("Can't apply to DataFrame as .col is not None. Use a Series or set .col to some value") # preprocessPd s = it[list(it)[col]] # preprocessPd try: return (farr or f)(s) # preprocessPd except: return np.array([f(e) for e in s]) # preprocessPd def checkRor(c): # checkRor if isinstance(c, BaseCli): return c # checkRor if hasattr(c, "__ror__"): return cli.aS(c.__ror__) # checkRor if callable(c): return cli.aS(c) # checkRor raise Exception(f"Trying to add an operator to the pipeline, but the given object is not derived from BaseCli nor does it define a __ror__ method") # checkRor class serialRepeat(BaseCli): # serialRepeat def __init__(self, f, n:int): # serialRepeat self.f = f; self.n = n; self._fC = fastF(f) # serialRepeat def __ror__(self, it): # serialRepeat f = self._fC # serialRepeat for i in range(self.n): it = f(it) # serialRepeat return it # serialRepeat def _jsF(self, meta): # serialRepeat f = self.f; fIdx = _jsFAuto(); dataIdx = _jsDAuto(); res = k1lib.kast.asyncGuard(k1lib.kast.prepareFunc3(f, meta)) # serialRepeat if res is NotImplemented: raise Exception(f"{cli.__class__} can't be transpiled into js. Either it doesn't make sense, or it hasn't been built yet") # serialRepeat header, fn, _async = res # serialRepeat return f"""\ {header}\n{fIdx} = {'async ' if _async else ''}({dataIdx}) => {{ for (let i = 0; i < {self.n}; i++) {{ {dataIdx} = {'await ' if _async else ''}{fn}({dataIdx}); }} return {dataIdx}; }}""", fIdx # serialRepeat
[docs]class serial(BaseCli): # serial
[docs] def __init__(self, *clis:List[BaseCli]): # serial """Merges clis into 1, feeding end to end. Used in chaining clis together without a prime iterator. Meaning, without this, stuff like this fails to run:: [1, 2] | a() | b() # runs c = a() | b(); [1, 2] | c # doesn't run if this class doesn't exist""" # serial fs = [checkRor(c) for c in clis]; super().__init__(fs); self.clis = fs; self._runOpt() # serial
def _runOpt(self): # serial self._hasTrace = any(isinstance(c, cli.trace) for c in self.clis) # serial self._cliCs = [fastF(c) for c in self.clis]; return self # serial def _typehint(self, inp=None): # serial for c in self.clis: inp = c._typehint(inp) or cli.typehint.tAny() # serial return inp # serial
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # serial if self._hasTrace: # slower, but tracable # serial for cli in self._clis_after_capture_analysis: it = it | cli # serial else: # faster, but not tracable # serial for cli in self._cliCs: it = cli(it) # serial return it # serial
def _before(self, c): return serial(checkRor(c), *self.clis) # serial def _after (self, c): return serial(*self.clis, checkRor(c)) # serial def _jsF(self, meta): # serial headers = []; fns = []; asyncs = []; fIdx = _jsFAuto(); dataIdx = _jsDAuto() # serial for cli in self.clis: # serial res = k1lib.kast.asyncGuard(cli._jsF(meta)) # serial if res is NotImplemented: raise Exception(f"{cli.__class__} can't be transpiled into js. Either it doesn't make sense, or it hasn't been built yet") # serial header, fn, _async = res # serial headers.append(header); fns.append(fn); asyncs.append(_async) # serial body = dataIdx # serial for fn, _async in zip(fns, asyncs): body = f"{'await ' if _async else ''}{fn}({body})" # serial return "\n".join(headers) + f"""\n{fIdx} = {'async ' if any(asyncs) else ''}({dataIdx}) => {{ return {body}; }};""", fIdx # serial
[docs] @staticmethod # serial def repeat(f, n:int): # serial """Executes this function over and over again for n times. Example:: # returns 6561, or ((3^2)^2)^2 3 | serial.repeat(op()**2, 3) Of course, you can also do something like this:: 3 | serial(*[lambda x: x**2]*3) And it would achieve the same result, but using this method, you can vary n if you were to transpile it to JS. :param f: function to execute :param n: how many times to execute this function serially""" # serial return serialRepeat(f, n) # serial
atomic.add("baseAnd", (Number, np.number, str, dict, bool, bytes, list, tuple, *([torch.Tensor] if hasTorch else []), np.ndarray, xml.etree.ElementTree.Element), "used by BaseCli.__and__") # serial def addAtomic(klass): # addAtomic atomic.baseAnd = (*atomic.baseAnd, klass) # addAtomic atomic.deref = (*atomic.deref, klass) # addAtomic def _iterable(it): # _iterable try: iter(it); return True # _iterable except: return False # _iterable
[docs]class oneToMany(BaseCli): # oneToMany
[docs] def __init__(self, *clis:List[BaseCli]): # oneToMany """Duplicates 1 stream into multiple streams, each for a cli in the list. Used in the "a & b" joining operator. See also: :meth:`BaseCli.__and__`""" # oneToMany fs = [checkRor(c) for c in clis]; super().__init__(fs); self.clis = fs; self._cache() # oneToMany
def _typehint(self, inp): # oneToMany ts = [] # oneToMany for f in self.clis: # oneToMany try: ts.append(f._typehint(inp)) # oneToMany except: ts.append(cli.typehint.tAny()) # oneToMany return cli.typehint.tCollection(*ts).reduce() # oneToMany
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Iterator[Any]]: # oneToMany if isinstance(it, atomic.baseAnd) or isinstance(it, k1lib.cli.splitSeek) or not _iterable(it): # oneToMany for cli in self._cliCs: yield cli(it) # oneToMany else: # oneToMany its = itertools.tee(it, len(self.clis)) # oneToMany for cli, it in zip(self._cliCs, its): yield cli(it) # oneToMany
def _cache(self): self._cliCs = [fastF(c) for c in self.clis]; return self # oneToMany def _before(self, c): self.clis = [checkRor(c)] + self.clis; return self._cache() # oneToMany def _after(self, c): self.clis = self.clis + [checkRor(c)]; return self._cache() # oneToMany def _copy(self): return oneToMany(*self.clis) # oneToMany def _jsF(self, meta): # oneToMany headers = []; fns = []; asyncs = []; fIdx = _jsFAuto(); dataIdx = _jsDAuto() # oneToMany for cli in self.clis: # oneToMany res = k1lib.kast.asyncGuard(cli._jsF(meta)) # oneToMany if res is NotImplemented: raise Exception(f"{cli.__class__} can't be transpiled into js. Either it doesn't make sense, or it hasn't been built yet") # oneToMany header, fn, _async = res # oneToMany headers.append(header); fns.append(fn); asyncs.append(_async) # oneToMany body = "[" + ", ".join([f"{'await ' if _async else ''}{fn}({dataIdx})" for fn, _async in zip(fns, asyncs)]) + "]" # oneToMany return "\n".join(headers) + f"""\n{fIdx} = {'async ' if any(asyncs) else ''}({dataIdx}) => {body};""", fIdx # oneToMany
[docs]class mtmS(BaseCli): # mtmS
[docs] def __init__(self, *clis:List[BaseCli]): # mtmS """Applies multiple streams to multiple clis independently. Used in the "a + b" joining operator. See also: :meth:`BaseCli.__add__`. Weird name is actually a shorthand for "many to many specific".""" # mtmS fs = [checkRor(c) for c in clis]; super().__init__(fs=fs); self.clis = fs; self._cache() # mtmS
def _inpTypeHintExpand(self, t): # mtmS n = len(self.clis); # mtmS if isinstance(t, (cli.typehint.tCollection, *cli.typehint.tListIterSet, cli.typehint.tArrayTypes)): return t.expand(n) # mtmS else: return [cli.typehint.tAny()]*n # mtmS def _typehint(self, t): # mtmS n = len(self.clis); outTs = [] # mtmS for c, t in zip(self.clis, self._inpTypeHintExpand(t)): # mtmS try: outTs.append(c._typehint(t)) # mtmS except: outTs.append(cli.typehint.tAny()) # mtmS return cli.typehint.tCollection(*outTs).reduce() # mtmS def _cache(self): self._cliCs = [fastF(c) for c in self.clis]; return self # mtmS def _before(self, c): self.clis = [checkRor(c)] + self.clis; return self._cache() # mtmS def _after (self, c): self.clis = self.clis + [checkRor(c)]; return self._cache() # mtmS
[docs] def __ror__(self, its:Iterator[Any]) -> Iterator[Any]: # mtmS for cli, it in zip(self._cliCs, its): yield cli(it) # mtmS
[docs] @staticmethod # mtmS def f(f, i:int, n:int=100): # mtmS """Convenience method, so that this:: mtmS(iden(), op()**2, iden(), iden(), iden()) # also the same as this btw: (iden() + op()**2 + iden() + iden() + iden()) is the same as this:: mtmS.f(op()**2, 1, 5) Example:: # returns [5, 36, 7, 8, 9] range(5, 10) | mtmS.f(op()**2, 1, 5) | deref() :param i: where should I put the function? :param n: how many clis in total? Defaulted to 100""" # mtmS return mtmS(*([cli.iden()]*i + [f] + [cli.iden()]*(n-i-1))) # mtmS
def _copy(self): return mtmS(*self.clis) # mtmS def _jsF(self, meta): # mtmS headers = []; fns = []; asyncs = []; fIdx = _jsFAuto(); dataIdx = _jsDAuto() # mtmS for cli in self.clis: # mtmS res = k1lib.kast.asyncGuard(cli._jsF(meta)) # mtmS if res is NotImplemented: raise Exception(f"{cli.__class__} can't be transpiled into js. Either it doesn't make sense, or it hasn't been built yet") # mtmS header, fn, _async = res # mtmS headers.append(header); fns.append(fn); asyncs.append(_async) # mtmS body = "[" + ", ".join([f"{'await ' if _async else ''}{fn}({dataIdx}[{i}])" for i, (fn, _async) in enumerate(zip(fns, asyncs))]) + "]" # mtmS return "\n".join(headers) + f"""\n{fIdx} = {'async ' if any(asyncs) else ''}({dataIdx}) => {body};""", fIdx # mtmS
[docs]def patchNumpy(): # patchNumpy """Patches numpy arrays and data types, so that piping like this work:: a = np.random.randn(3) a | shape() # returns (3,)""" # patchNumpy try: # patchNumpy if np._k1_patched: return # patchNumpy except: pass # patchNumpy try: # patchNumpy import forbiddenfruit, inspect; #forbiddenfruit.reverse(np.ndarray, "__or__") # old version # patchNumpy oldOr = np.ndarray.__or__ # patchNumpy def _newNpOr(self, v): # patchNumpy if isinstance(v, BaseCli): return NotImplemented # patchNumpy try: return oldOr(self, v) # patchNumpy except: warnings.warn(traceback.format_exc()) # patchNumpy forbiddenfruit.curse(np.ndarray, "__or__", _newNpOr) # patchNumpy a = [getattr(np, dk) for dk in np.__dict__.keys()] # patching all numpy's numeric types # patchNumpy for _type in [x for x in a if inspect.isclass(x) and issubclass(x, np.number) and not issubclass(x, np.integer)]: # patchNumpy _oldOr = _type.__or__ # patchNumpy def _typeNewOr(self, v): # patchNumpy if isinstance(v, BaseCli): return NotImplemented # patchNumpy try: return _oldOr(self, v) # patchNumpy except: warnings.warn(traceback.format_exc()) # patchNumpy forbiddenfruit.curse(_type, "__or__", _typeNewOr) # patchNumpy np._k1_patched = True # patchNumpy except Exception as e: warnings.warn(f"Tried to patch __or__ operator of built-in type `np.ndarray` but can't because: {e}") # patchNumpy
dict_keys = type({"a": 3}.keys()); oldDKOr = dict_keys.__or__ # patchNumpy dict_items = type({"a": 3}.items()); oldDIOr = dict_items.__or__ # patchNumpy oldSetOr = set.__or__ # patchNumpy
[docs]def patchDict(): # patchDict """Patches dictionaries's items and keys, so that piping works:: d = {"a": 3, "b": 4} d.keys() | deref() # returns ["a", "b"] d.items() | deref() # returns [["a", 3], ["b", 4]]""" # patchDict try: # patchDict if np._k1_dict_patched: return # patchDict except: pass # patchDict try: # patchDict import forbiddenfruit, traceback # patchDict def _newDOr(self, v): # patchDict """Why is this so weird? For some reason, if you patch dict_keys, you will also patch dict_items. So, if you were to have 2 functions, one for each, then they will override each other. The way forward is to have 1 single function detect whether it's dict_keys or dict_items, and call the correct original function. So why are there 2 curses? Well cause I'm lazy to check for this behavior in multiple python versions, so just have 2 to make sure.""" # patchDict if isinstance(v, BaseCli): return NotImplemented # patchDict try: # patchDict # print(self, type(self), v, type(v)) # patchDict if isinstance(self, dict_keys): return oldDKOr(self, v) # patchDict elif isinstance(self, dict): # patchDict if isinstance(v, dict_keys): return oldSetOr(set(self.keys()), set(v)) # patchDict return oldDIOr(self, v) # patchDict elif isinstance(self, set): # patchDict if isinstance(v, dict_keys): return oldSetOr(self, set(v)) # patchDict return oldSetOr(self, v) # patchDict except: # patchDict print(self, type(self), v, type(v)) # patchDict warnings.warn(traceback.format_exc()) # patchDict return NotImplemented # patchDict forbiddenfruit.curse(dict_keys, "__or__", _newDOr) # patchDict forbiddenfruit.curse(dict_items, "__or__", _newDOr) # patchDict np._k1_dict_patched = True # patchDict except Exception as e: warnings.warn(f"Tried to patch __or__ operator of built-in type `dict_keys` and `dict_items` but can't because: {e}") # patchDict