# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for quick modifiers, think of them as changing formats, or modifying an
array in place
"""
__all__ = ["applyS", "aS", "iterDelay", "apply", "applyMp", "parallel", "applyCl",
"applyTh", "applySerial",
"sort", "sortF", "consume", "randomize", "stagger", "op",
"integrate", "roll", "clamp"]
from typing import Callable, Iterator, Any, Union, List, Tuple
from k1lib.cli.init import patchDefaultDelim, BaseCli, fastF; import k1lib.cli.init as init
import k1lib.cli as cli, numpy as np, threading, gc, random, k1lib
from collections import deque, defaultdict; from functools import partial, update_wrapper, lru_cache
from k1lib.cli.typehint import *; requests = k1lib.dep.requests
import dill, pickle, json, k1lib, warnings, atexit, signal, time, os, random, sys
try: import torch; import torch.multiprocessing as mp; hasTorch = True
except: import multiprocessing as mp; hasTorch = False
try: import ray; hasRay = True
except: hasRay = False
try: import pandas as pd; pd.core; hasPandas = True
except: hasPandas = False
settings = k1lib.settings.cli
[docs]class applyS(BaseCli): # applyS
blurb="Applies a function to the input in pipe style" # applyS
[docs] def __init__(self, f:Callable[[Any], Any], *args, **kwargs): # applyS
"""Like :class:`apply`, but much simpler, just operating on the entire input
object, essentially. The "S" stands for "single". There's
also an alias shorthand for this called :class:`aS`. Example::
# returns 5
3 | aS(lambda x: x+2)
Like :class:`apply`, you can also use this as a decorator like this::
@aS
def f(x):n
return x+2
# returns 5
3 | f
This also decorates the returned object so that it has same qualname, docstring
and whatnot.
.. admonition:: Shorthands
Writing out "lambda x:" all the time is annoying, and there are ways
to quickly say ``lambda x: x+2`` like so::
3 | op()+2 # returns 5
3 | aS("x+2") # returns 5. Behind the scenes, it compiles and execute `lambda x: x+2`
The first way is to use :class:`op`, that will absorb all operations done on it,
like "+", and returns a function that essentially replays all the operations.
In the second way, you only have to pass in the string containing code that you want
done on the variable "x". Then internally, it will compile to regular Python code.
In fact, you can pass in ``op()`` or just a string to any cli that accepts any kind
of function, like :class:`~k1lib.cli.filt.filt` or :class:`apply`::
range(4) | apply("x-2") | deref()
range(4) | apply(op()-2) | deref()
range(4) | filt("x%2") | deref()
range(4) | filt(op()%2) | deref()
:param f: the function to be executed
:param kwargs: other keyword arguments to pass to the function, together with ``args``""" # applyS
super().__init__(fs=[f]); self.args = args; self.kwargs = kwargs # applyS
self.f = f; self._fC = fastF(f); update_wrapper(self, f, updated=()) # applyS
self.inverted = False; self.preInvAS = None # applyS
def _typehint(self, inp): # applyS
if self.hasHint: return self._hint # applyS
try: return self.f._typehint(inp) # applyS
except: return tAny() # applyS
[docs] def __ror__(self, it:Any) -> Any: return self._fC(it, *self.args, **self.kwargs) # applyS
def _all_array_opt(self, it, level): # applyS
res = None; ogShape = tuple(it.shape[:level]) # applyS
if isinstance(it, np.ndarray): # applyS
n = len(it.shape); out = self._fC(np.transpose(it, (*range(level, n), *range(level))), *self.args, **self.kwargs); outN = len(out.shape) # applyS
res = np.transpose(out, (*range(outN - level, outN), *range(outN - level))) # applyS
if hasTorch and isinstance(it, torch.Tensor): # applyS
n = len(it.shape); out = self._fC(torch.transpose_axes(it, (*range(level, n), *range(level))), *self.args, **self.kwargs); outN = len(out.shape) # applyS
res = torch.transpose_axes(out, (*range(outN - level, outN), *range(outN - level))) # applyS
if res is None: return NotImplemented # applyS
elif ogShape != tuple(res.shape[:level]): return NotImplemented # applyS
else: return res # applyS
[docs] def __invert__(self): # applyS
"""Configures it so that it expand the arguments out.
Example::
# returns 5
[2, 3] | ~aS(lambda x, y: x + y)
def f(x, y, a=4):
return x*y + a
# returns 10
[2, 3] | ~aS(f)
# returns 11
[2, 3] | ~aS(f, a=5)""" # applyS
if self.inverted: raise Exception("Doesn't support __invert__()ing multiple times") # applyS
f = self._fC; a = self.args; kw = self.kwargs; res = aS(lambda x: f(*init.dfGuard(x), *a, **kw)); # applyS
res.inverted = True; res.preInvAS = self; return res # applyS
def _jsF(self, meta): # applyS
# if len(self.kwargs) > 0: raise Exception("JS does not have the concept of keyword arguments") # applyS
# if len(self.args) > 0: raise Exception("aS._jsF() doesn't support *args yet") # applyS
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); argIdx = init._jsDAuto(); inverted = False # applyS
if self.inverted: self = self.preInvAS; inverted = True # applyS
# lookup for custom _jsF() functions # applyS
header, _fIdx, _async = k1lib.kast.asyncGuard(k1lib.kast.prepareFunc3(self.f, ("aS", meta), self.kwargs, self.args)) # applyS
# TODO: might want to inject args right here, on the JS side, instead of on the Python side # applyS
if inverted: return f"{header}\n{fIdx} = {'async ' if _async else ''}({dataIdx}) => {{ return {'await ' if _async else ''}{dataIdx}.aSInv{'_async' if _async else ''}({_fIdx}); }}", fIdx # applyS
else: return header, _fIdx # applyS
# for x,y in self.pattern | grep("\ue157", sep=True).till("\ue239") | cli.apply(cli.join("")) | cli.filt("x") | cli.apply(lambda x: [x, x.replace("\ue157", "${").replace("\ue239", "}")]): p = p.replace(x, y) # applyS
# return f"const {fIdx} = ({dataIdx}) => {dataIdx}.grep(`{p}`)", fIdx # applyS
aS = applyS # applyS
[docs]class iterDelay(BaseCli): # iterDelay
[docs] def __init__(self, n=5): # iterDelay
"""Iterates through the array, but make sure that there's a bit of a delay.
Example::
# returns [0, 1, 2, 3, 4, 5, 6, 7, 8. 9]
range(10) | iterDelay(5) | deref()
At first glance, it's not obvious what's going on. Internally, :class:`iterDelay`
will fetch 5 items from the input iterator right away and stores them in an
internal deque (double-ended queue). Then on the 6th fetch, it will start yielding
elements from the deque, hence introducing a delay while iterating through the input.
I'd admit that this is pretty niche. I normally would not need such a cli when doing
normal analysis, but some clis internally need this, like :class:`apply` when it detects
that there's an _all_opt() optimization. Another use case is that prefetch feature
of :class:`applyTh`, :class:`applyMp`, :class:`applyCl`""" # iterDelay
self.n = n # iterDelay
[docs] def __ror__(self, it): # iterDelay
if self.n <= 0: return it # iterDelay
sentinel = object(); q = deque(); it = iter(init.dfGuard(it)) # iterDelay
for i in range(self.n): # iterDelay
e = next(it, sentinel) # iterDelay
if e is sentinel: break # iterDelay
q.append(e) # iterDelay
def g(): # iterDelay
while True: # iterDelay
e = next(it, sentinel) # iterDelay
if e is sentinel: break # iterDelay
q.append(e); yield q.popleft() # iterDelay
while len(q) > 0: yield q.popleft() # iterDelay
return g() # iterDelay
# this section of code is a bit magical. The internal dynamics are so damn chaotic, but somehow, they all # iterDelay
# work together beautifully # iterDelay
def _allOpt_gen(a, q, n:int): # a is a complex, not deref-ed structure. Returns flattened a # _allOpt_gen
if n == 0: yield a; return # _allOpt_gen
c = 0; idx = len(q); q.append(None) # _allOpt_gen
for e in a: c += 1; yield from _allOpt_gen(e, q, n-1) # _allOpt_gen
q[idx] = c # _allOpt_gen
def _allOpt_genIr(a, n) -> "(flattened structure, ir, depth)": # a is a complex, not deref-ed structure # _allOpt_genIr
q = deque(); return _allOpt_gen(a, q, n), q, n # _allOpt_genIr
def _allOpt_recover(b:Iterator["data_structure"], ir:"Deque[int]", irIdx:Iterator[int], n): # assumes b and ir are iterators # _allOpt_recover
l = next(irIdx) # _allOpt_recover
for i in range(ir[l]): yield list(_allOpt_recover(b, ir, irIdx, n-1)) if n-1 > 0 else next(b) # _allOpt_recover
def _allOpt_recover(b:Iterator["data_structure"], ir:"Deque[int]", irIdx:Iterator[int], n): # assumes b and ir are iterators # _allOpt_recover
l = next(irIdx); c = 0 # _allOpt_recover
while True: # _allOpt_recover
if ir[l] != None and ir[l] <= c: break # _allOpt_recover
c += 1; yield list(_allOpt_recover(b, ir, irIdx, n-1)) if n-1 > 0 else next(b) # that list() right there is quite crucial! # _allOpt_recover
def infIter(): # infIter
i = 0; # infIter
while True: yield i; i += 1 # infIter
[docs]class apply(BaseCli): # apply
blurb="Applies a function to all input elements" # apply
[docs] def __init__(self, f:Callable[[Any], Any], column:Union[int, List[int]]=None, cache:int=0, **kwargs): # apply
"""Applies a function f to every element in the incoming list/iterator.
Example::
# returns [0, 1, 4, 9, 16]
range(5) | apply(lambda x: x**2) | deref()
# returns [[3.0, 1.0, 1.0], [3.0, 1.0, 1.0]], running the function on the 0th column
torch.ones(2, 3) | apply(lambda x: x+2, 0) | deref()
# returns [[0, -1, 2, 3, -4], [2, -3, 4, 5, -6], [0, -1, 4, 9, -16]], running the function on the 1st (0-index btw) and 4th columns
[[0, 1, 2, 3, 4], [2, 3, 4, 5, 6], [0, 1, 4, 9, 16]] | apply(lambda x: -x, [1, 4]) | deref()
You can also use this as a decorator, like this::
@apply
def f(x):
return x**2
# returns [0, 1, 4, 9, 16]
range(5) | f | deref()
You can also add a cache, like this::
def calc(i): time.sleep(0.5); return i**2
# takes 2.5s
range(5) | repeatFrom(2) | apply(calc, cache=10) | deref()
# takes 5s
range(5) | repeatFrom(2) | apply(calc) | deref()
You can add custom keyword arguments into the function::
def f(x, y, z=3):
return x + y + z
# returns [15, 17, 19, 21, 23]
[range(5), range(10, 15)] | transpose() | ~apply(f, z=5) | deref()
Slight reminder that you can't pass in extra positional args like in :class:`aS`, just extra keyword arguments.
See also: :class:`aS`, :class:`~k1lib.cli.filt.filt`
.. admonition:: JS transpiler notes
So, because JS don't have the concept of keyword arguments, ``kwargs`` will have its values
extracted, then injected as positional arguments in the transpiled JS function.
:param column: if not None, then applies the function to that column or columns only
:param cache: if specified, then caches this much number of values
:param kwargs: extra keyword arguments to pass in the function""" # apply
super().__init__(fs=[f]); self.f = f; self.kwargs = kwargs # f is the original operator, _fC is # apply
if column: # quick type checks # apply
ex = Exception(f"Applying a function on a negative-indexed column ({column}) is not supported") # apply
if isinstance(column, int): # apply
if column < 0: raise ex # apply
else: # apply
column = list(column) # apply
if len([c for c in column if c < 0]): raise ex # apply
self.column = column; self.cache = cache; self._fC = fastF(f) # apply
if cache > 0: self._fC = lru_cache(cache)(self._fC) # apply
self.normal = self.column is None and self.cache == 0 and len(kwargs) == 0 # cached value to say that this apply is just being used as a wrapper, nothing out of the ordinary, like custom columns, cache or custom kwargs # apply
if self.normal: # just propagating information upward, to save runtime graph analysis time # apply
try: self._propagatedF = f._propagatedF; self._applyDepth = f._applyDepth + 1 # assuming f is another apply() # apply
except: self._propagatedF = f; self._applyDepth = 1 # apply
else: self._propagatedF = None; self._applyDepth = 1 # might have to rethink if this depth should be 1 or not # apply
# optimization 1: BaseCli._all_array_opt(), aimed at accelerating array types # apply
self.__arrayTypeF = None # None for not formulated yet, 0 for cannot formulate a faster operation, else the cached, accelerated function (that might not work) # apply
# optimization 2: BaseCli._all_opt(), aimed at accelerating language models # apply
self.__allOptF = None # None for not formulated yet, 0 for cannot formulate a faster operation, else the cached, accelerated function (that will guaranteed to work) # apply
self.inverted = False; self.preInvApply = None # for ._jsF(), to contain information about the apply() pre __invert__(), so that .jsF() can extract out information # apply
@property # apply
def _arrayTypeF(self): # optimization 1: returns None or the function (that might not work) # apply
if self.__arrayTypeF == 0: return None # apply
if self.__arrayTypeF is None: # apply
arrs = []; last = self # figure out the depth # apply
while isinstance(last, apply) and last.normal: arrs.append(last); last = last.f # apply
depth = len(arrs) # apply
if depth == 0: self.__arrayTypeF = 0; return None # apply
if isinstance(last, cli.serial): # breaks up the serial: (A | B.all(2)).all(3) -> A.all(3) | B.all(5) # apply
self.__arrayTypeF = cli.serial(*[(e if isinstance(e, BaseCli) else aS(e)).all(depth) for e in last.clis]); return self.__arrayTypeF # apply
else: # it | A.all(3) -> A._all_array_opt(it, 3). This function might return NotImplemented, which means it can't figure out how to utilize the speed up # apply
if not hasattr(last, "_all_array_opt"): last = aS(last) # apply
self.__arrayTypeF = lambda it: last._all_array_opt(it, depth); return self.__arrayTypeF # apply
return self.__arrayTypeF # apply
@property # apply
def _allOptF(self): # optimization 2: returns None or the function (that has to work all the time!) # apply
if self.__allOptF == 0: return None # apply
if self._propagatedF is None or not hasattr(self._propagatedF, "_all_opt"): self.__allOptF = 0; return None # apply
f = self._propagatedF._all_opt # apply
def inner(it): # has to regenerate the IR on each pass through. Slow (O(30*n) or so), but the function this is supposed to run (LLMs), are even slower, so this is fine for now # apply
af, ir, n = _allOpt_genIr(it, self._applyDepth) # af = a flat # apply
af = af | iterDelay() # apply
return _allOpt_recover(iter(f(af)), ir, infIter(), n) # apply
return inner # apply
def _typehint(self, inp): # apply
if self.column is None: # apply
if isinstance(inp, tListIterSet): # apply
try: return tIter(self.f._typehint(inp.child)) # apply
except: return tIter(tAny()) # apply
return super()._typehint(inp) # apply
def _copy(self): return apply(self.f, self.column, self.cache, **self.kwargs) # ~apply() case handled automatically # apply
[docs] def __ror__(self, it:Iterator[str]): # apply
c = self.column; f = self._fC; ogF = self.f; farr = (getattr(ogF, "_all_opt2", None) or f); kwargs = self.kwargs # apply
if c is None: # apply
if self.normal: # apply
if isinstance(it, settings.arrayTypes): # optimization 1 # apply
af = self._arrayTypeF # apply
if af is not None: # there're lots of code here, but it doesn't impact perf cause it's done once for each array object # apply
try: # apply
ans = af(it) # apply
if ans is not NotImplemented: return ans # apply
except init.ArrayOptException as e: raise e # this is a special exception. If the function throws this then the error relates to incorrect constraints, and not accidental, so shouldn't be ignored # apply
except: pass # apply
self.__arrayTypeF = 0 # tried to use the accelerated version, but failed, so won't ever try the accelerated version again # apply
elif self._allOptF is not None: return self._allOptF(it) # optimization 2, for LLMs # apply
elif hasPandas and isinstance(it, pd.core.arraylike.OpsMixin): # apply
it = init.dfGuard(it) # apply
try: # apply
res = farr(it) # apply
if res is not NotImplemented: return res # apply
except: return np.array([f(x) for x in it]) # apply
return (f(line, **kwargs) for line in init.dfGuard(it)) # apply
elif isinstance(c, int): # apply
if hasPandas and isinstance(it, pd.DataFrame): # apply
it = it.copy(); colN = list(it)[c] # apply
try: it[colN] = f(it[colN]) # apply
except: it[colN] = [f(x) for x in it[colN]] # apply
return it # apply
def gen(it): # apply
for row in it: row = list(row); row[c] = f(row[c], **kwargs); yield row # apply
return gen(it) # return ([(e if i != c else f(e, **kwargs)) for i, e in enumerate(row)] for row in it) # old version # apply
else: # List[int] # apply
if hasPandas and isinstance(it, pd.DataFrame): # apply
it = it.copy() # apply
for c_ in c: # apply
colN = list(it)[c_] # apply
try: it[colN] = f(it[colN]) # apply
except: it[colN] = [f(x) for x in it[colN]] # apply
return it # apply
def gen(it): # apply
for row in it: # apply
row = list(row) # apply
for c_ in c: row[c_] = f(row[c_], **kwargs) # apply
yield row # apply
return gen(it) # apply
[docs] def __invert__(self): # apply
"""Same mechanism as in :class:`applyS`, it expands the
arguments out. Just for convenience really. Example::
# returns [10, 12, 14, 16, 18]
[range(5), range(10, 15)] | transpose() | ~apply(lambda x, y: x+y) | deref()""" # apply
if self.inverted: raise Exception("Doesn't support _invert__()ing multiple times") # apply
res = apply(lambda x: self._fC(*x, **self.kwargs), self.column, self.cache) # apply
res.preInvApply = self; res.inverted = True; return res # apply
def _jsF(self, meta): # apply
if self.cache != 0: raise Exception("apply._jsF() doesn't support caching values yet") # apply
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); kwIdx = init._jsDAuto(); argIdx = init._jsDAuto(); inverted = False # apply
if self.inverted: self = self.preInvApply; inverted = True # apply
header, _fIdx, _async = k1lib.kast.asyncGuard(k1lib.kast.prepareFunc3(self.f, ("apply", meta), self.kwargs)) # apply
return f"{header}\n{kwIdx} = {json.dumps(self.kwargs)};\n{fIdx} = {'async ' if _async else ''}({dataIdx}) => {dataIdx}.apply{'_async' if _async else ''}({'async ' if _async else ''}({argIdx}) => {'await ' if _async else ''}{_fIdx}({'...' if inverted else ''}{argIdx}), {cli.kjs.v(self.column)}, {kwIdx}, false)", fIdx # apply
# apply
# old code below, with args (self, kast, jsFnVars:"list[str]", **kwargs) # apply
argVars = kast.kast_lambda(self.f); var = ",".join(argVars) # apply
fn, header = kast.kast_prepareFunc(self.f, [*argVars, *jsFnVars]) # apply
col = self.column # apply
if col is None: return f".apply(({var}) => {fn})", header # apply
else: # apply
cols = [col] if isinstance(col, int) else col # apply
return "".join([f".apply(({var}) => {fn}, {c})" for c in cols]), header # apply
cloudpickle = k1lib.dep("cloudpickle", url="https://github.com/cloudpipe/cloudpickle") # apply
def executeFunc(common, line, usingDill): # executeFunc
import time # executeFunc
if usingDill: # executeFunc
import dill; f, kwargs = dill.loads(common) # executeFunc
res = f(dill.loads(line), **kwargs) # executeFunc
else: # executeFunc
import cloudpickle; f, kwargs = cloudpickle.loads(common) # executeFunc
res = f(cloudpickle.loads(line), **kwargs) # executeFunc
time.sleep(0.1); return res # suggestion by https://stackoverflow.com/questions/36359528/broken-pipe-error-with-multiprocessing-queue # executeFunc
def terminateGraceful(): signal.signal(signal.SIGINT, signal.SIG_IGN) # terminateGraceful
_k1_applyMp_global_ctx = {}; _k1_applyMp_global_ctx_autoInc = k1lib.AutoIncrement(prefix="_k1_applyMp") # terminateGraceful
[docs]class applyMp(BaseCli): # applyMp
blurb="Applies a function to all input elements across multiple processes" # applyMp
_pools = set() # applyMp
_torchNumThreads = None # applyMp
[docs] def __init__(self, f:Callable[[Any], Any], prefetch:int=None, timeout:float=8, utilization:float=0.8, bs:int=1, newPoolEvery:int=0, **kwargs): # applyMp
"""Like :class:`apply`, but execute a function over the input iterator
in multiple processes. Example::
# returns [3, 2]
["abc", "de"] | applyMp(len) | deref()
# returns [5, 6, 9]
range(3) | applyMp(lambda x, bias: x**2+bias, bias=5) | deref()
# returns [[1, 2, 3], [1, 2, 3]], demonstrating outside vars work
someList = [1, 2, 3]
["abc", "de"] | applyMp(lambda s: someList) | deref()
Internally, this will continuously spawn new jobs up until 80% of all CPU
cores are utilized. On posix systems, the default multiprocessing start method is
``fork()``. This sort of means that all the variables in memory will be copied
over. On windows and macos, the default start method is ``spawn``, meaning each
child process is a completely new interpreter, so you have to pass in all required
variables and reimport every dependencies. Read more at https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
If you don't wish to schedule all jobs at once, you can specify a ``prefetch``
amount, and it will only schedule that much jobs ahead of time. Example::
range(10000) | applyMp(lambda x: x**2) | head() | deref() # 700ms
range(10000) | applyMp(lambda x: x**2, 5) | head() | deref() # 300ms
# demonstrating there're no huge penalties even if we want all results at the same time
range(10000) | applyMp(lambda x: x**2) | deref() # 900ms
range(10000) | applyMp(lambda x: x**2, 5) | deref() # 1000ms
The first line will schedule all jobs at once, and thus will require more RAM and
compute power, even though we discard most of the results anyway (the
:class:`~k1lib.cli.filt.head` cli). The second line only schedules 5 jobs ahead of
time, and thus will be extremely more efficient if you don't need all results right
away.
.. note::
Remember that every :class:`~k1lib.cli.init.BaseCli` is also a
function, meaning that you can do stuff like::
# returns [['ab', 'ac']]
[["ab", "cd", "ac"]] | applyMp(filt(op().startswith("a")) | deref()) | deref()
Also remember that the return result of ``f`` should be serializable, meaning it
should not be a generator. That's why in the example above, there's a ``deref()``
inside f. You should also convert PyTorch tensors into Numpy arrays
Most of the time, you would probably want to specify ``bs`` to something bigger than 1
(may be 32 or sth like that). This will executes ``f`` multiple times in a single job,
instead of executing ``f`` only once per job. Should reduce overhead of process
creation dramatically.
If you encounter strange errors not seen on :class:`apply`, you can try to clear all
pools (using :meth:`clearPools`), to terminate all child processes and thus free
resources. On earlier versions, you have to do this manually before exiting, but now
:class:`applyMp` is much more robust.
Also, you should not immediately assume that :class:`applyMp` will always be faster
than :class:`apply`. Remember that :class:`applyMp` will create new processes,
serialize and transfer data to them, execute it, then transfer data back. If your code
transfers a lot of data back and forth (compared to the amount of computation done), or
the child processes don't have a lot of stuff to do before returning, it may very well
be a lot slower than :class:`apply`.
There's a potential loophole here that can make your code faster. Because the main
process is forked (at least on linux), every variable is still there, even the big
ones. So, you can potentially do something like this::
bigData = [] # 1B items in the list
# summing up all items together. No input data transfers (because it's forked instead)
range(1_000_000_000) | batched(100) | applyMp(lambda r: r | apply(lambda i: bigData[i]) | toSum()) | toSum()
In fact, I use this loophole all the time, and thus has made the function :meth:`shared`,
so check it out.
:param prefetch: if not specified, schedules all jobs at the same time. If
specified, schedules jobs so that there'll only be a specified amount of
jobs, and will only schedule more if results are actually being used.
:param timeout: seconds to wait for job before raising an error
:param utilization: how many percent cores are we running? 0 for no cores, 1 for
all the cores. Defaulted to 0.8
:param bs: if specified, groups ``bs`` number of transforms into 1 job to be more
efficient.
:param kwargs: extra arguments to be passed to the function. ``args`` not
included as there're a couple of options you can pass for this cli.
:param newPoolEvery: creates a new processing pool for every specific amount of input
fed. 0 for not refreshing any pools at all. Turn this on in case your process consumes
lots of memory and you have to kill them eventually to free up some memory""" # applyMp
super().__init__(fs=[f]); self.f = fastF(f) # applyMp
self.prefetch = prefetch or int(1e9) # applyMp
self.timeout = timeout; self.utilization = utilization # applyMp
self.bs = bs; self.kwargs = kwargs; self.p = None # applyMp
self.newPoolEvery = newPoolEvery; self.ps = []; self._serializeF = True # applyMp
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # applyMp
timeout = self.timeout; f = self.f # really make sure it's an iterator, for prefetch # applyMp
if self.bs > 1: return it | cli.batched(self.bs, True) | applyMp(apply(f) | cli.toList(), self.prefetch, timeout, **self.kwargs) | cli.joinStreams() # applyMp
def newPool(): # applyMp
if hasTorch: # applyMp
try: applyMp._torchNumThreads = applyMp._torchNumThreads or torch.get_num_threads(); torch.set_num_threads(1) # applyMp
except: pass # why do all of this? Because some strange interaction between PyTorch and multiprocessing, outlined here: https://github.com/pytorch/pytorch/issues/82843 # applyMp
os.environ["py_k1lib_in_applyMp"] = "True" # applyMp
self.p = mp.Pool(int(mp.cpu_count()*self.utilization), terminateGraceful); self.ps.append(self.p) # applyMp
if hasTorch and applyMp._torchNumThreads is not None: torch.set_num_threads(applyMp._torchNumThreads) # applyMp
def intercept(it, n): # applyMp
for i, e in enumerate(it): # applyMp
if i % n == 0: # applyMp
if self.p is not None: self.p.close(); self.ps.remove(self.p) # applyMp
gc.collect(); newPool() # applyMp
yield e # applyMp
try: common = dill.dumps([f, self.kwargs]); usingDill = True # applyMp
except: common = cloudpickle.dumps([f, self.kwargs]); usingDill = False # applyMp
def gen(it): # applyMp
with k1lib.captureStdout(False, True) as out: # applyMp
try: # applyMp
if self.newPoolEvery > 0: it = intercept(it, self.newPoolEvery) # applyMp
else: newPool() # applyMp
yield from it | apply(lambda line: self.p.apply_async(executeFunc, [common, dill.dumps(line), usingDill])) | iterDelay(self.prefetch) | apply(lambda x: x.get(timeout)) # applyMp
except KeyboardInterrupt as e: # applyMp
print("applyMp interrupted. Terminating pool now") # applyMp
for p in self.ps: p.close(); p.terminate(); # applyMp
raise e # applyMp
except Exception as e: # applyMp
print("applyMp encounter errors. Terminating pool now") # applyMp
for p in self.ps: p.close(); p.terminate(); # applyMp
raise e # applyMp
else: # applyMp
for p in self.ps: p.close(); p.terminate(); # applyMp
return gen(it) # applyMp
[docs] @staticmethod # applyMp
def cat(fileName: str, f:Callable, n:int=None, rS=None, **kwargs): # applyMp
"""Like :meth:`applyCl.cat`, this will split a file up into multiple
sections, execute ``f`` over all sections and return the results.
Example::
fn = "~/repos/labs/k1lib/k1lib/cli/test/applyMp.cat"
"0123456789\\n"*100 | file(fn)
# returns [6, 6, 6, 7, 6, 6, 6, 7, 6, 6, 6, 7, 6, 6, 6, 8]
applyMp.cat(fn, shape(0), 16) | deref()
:param f: function to execute on an iterator of lines
:param n: how many chunks should it split the file into. Defaulted to the number of cpu cores available
:param rS: :class:`~k1lib.cli.inp.refineSeek` instance, if you need more fine-grained
control over section boundaries so as to not make everything corrupted
:param kwargs: extra keyword arguments for :class:`applyMp`""" # applyMp
return fileName | cli.splitSeek(n or os.cpu_count()) | (rS or cli.iden()) | cli.window(2) | ~applyMp(lambda x,y: cli.cat(fileName, sB=x, eB=y) | f, **kwargs) # applyMp
[docs] @staticmethod # applyMp
def shared(f, **kwargs): # applyMp
"""Execution model where the input iterator is dereferenced and shared across
all processes, bypassing serialization. Example::
a = range(1_000_000_000) | apply(lambda x: x*1.5 - 2000) | aS(list) # giant data structure
a | batched(50_000_000, True) | applyMp(toSum()) | toSum() # has to serialize and deserialize lists of numbers, which wastes lots of cpu cycles and memory
a | applyMp.shared(toSum()) | toSum() # giant data structure is forked, no serialization happens, no memory even gets copied, much faster
In the 2nd line, most of the time is spent on serializing the data and transferring
it to other processes, while in the 3rd line, most of the time is spent on calculating
the sum instead, as the giant data structure is forked, and Linux doesn't copy it internally.""" # applyMp
def inner(it): # applyMp
try: n = len(it) # applyMp
except: it = list(it); n = len(it) # applyMp
# this is pretty unintuitive right? Why do it this way? Turns out, if you were to reference `it` directly, it will store it in f's co_freevars, # applyMp
# which will be serialized, defeating the purpose. Moving it to a global variable forces it to move to co_names instead, avoiding serialization. This took forever to understand # applyMp
idx = _k1_applyMp_global_ctx_autoInc(); _k1_applyMp_global_ctx[idx] = it # applyMp
res = range(n) | cli.batched(round(n/os.cpu_count()+1), True) | applyMp(lambda r: f(_k1_applyMp_global_ctx[idx][r.start:r.stop]), **kwargs) | aS(list) # applyMp
_k1_applyMp_global_ctx[idx] = None; return res # applyMp
return aS(inner) # applyMp
def _copy(self): return applyMp(self.f, self.prefetch, self.timeout, self.utilization, self.bs, self.newPoolEvery, **self.kwargs) # applyMp
[docs] def __invert__(self): # applyMp
"""Expands the arguments out, just like :class:`apply`.
Example::
# returns [20, 20, 18, 14, 8, 0, -10, -22, -36, -52]
[range(10), range(20, 30)] | transpose() | ~applyMp(lambda x, y: y-x**2) | deref()""" # applyMp
res = self._copy(); f = res.f; res.f = lambda x: f(*x); return res # applyMp
[docs] @staticmethod # applyMp
def clearPools(): # applyMp
"""Terminate all existing pools. Do this before restarting/quitting the
script/notebook to make sure all resources (like GPU) are freed. **Update**:
you probably won't have to call this manually anymore since version 0.9, but
if you run into problems, try doing this.""" # applyMp
for p in applyMp._pools: # applyMp
try: p.terminate() # applyMp
except: pass # applyMp
applyMp._pools = set() # applyMp
[docs] @staticmethod # applyMp
def pools(): # applyMp
"""Get set of all pools. Meant for debugging purposes only.""" # applyMp
return applyMp._pools # applyMp
def __del__(self): # applyMp
return # applyMp
if hasattr(self, "p"): # applyMp
self.p.terminate(); # applyMp
if self.p in applyMp._pools: applyMp._pools.remove(self.p) # applyMp
# apparently, this doesn't do anything, at least in jupyter environment # applyMp
atexit.register(lambda: applyMp.clearPools()) # applyMp
parallel = applyMp # applyMp
s = k1lib.Settings(); settings.add("applyCl", s, "modifier.applyCl() settings") # applyMp
s.add("sudoTimeout", 300, "seconds before deleting the stored password for sudo commands") # applyMp
s.add("cpuLimit", None, "if specified (int), will not schedule more jobs if the current number of assigned cpus exceeds this") # applyMp
_password = k1lib.Wrapper(None); _cpuUsed = k1lib.Wrapper(0) # applyMp
def removePw(): # removePw
while True: time.sleep(settings.applyCl.sudoTimeout); _password.value = None # removePw
t = threading.Thread(target=removePw, daemon=True).start() # removePw
_nodeIdsCache = k1lib.Wrapper([]) # removePw
def specificNode(f, nodeId:str, num_gpus=0): # modify a function so that it will only run on a specific node only # specificNode
if num_gpus > 0: # specificNode
#return f.options(num_gpus=num_gpus, scheduling_strategy="SPREAD") # specificNode
return f.options(num_gpus=num_gpus, scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(node_id=nodeId, soft=False)) # specificNode
else: return f.options(scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(node_id=nodeId, soft=False)) # specificNode
def exportSe(se): # exportSe
if not isinstance(se, k1lib.Settings): return se # exportSe
return {k:exportSe(v) for k,v in se.__dict__.items() if not k.startswith("_") and k != "kjs"} # excluding dicts of kjs transpilers cause that causes problems for applyCl() as it's using cloudpickle # exportSe
def movePropsSe(obj, se): # movePropsSe
d = se.__dict__; keys = [e for e in d.keys() if not e.startswith("_")] # movePropsSe
for key in keys: # movePropsSe
if key not in obj or key == "kjs": continue # movePropsSe
if isinstance(d[key], k1lib.Settings): movePropsSe(obj[key], d[key]) # movePropsSe
else: d[key] = obj[key] # movePropsSe
_applyCl_soCache = set() # dynamic library (.so) that has been installed across all nodes, so don't have to reimport # movePropsSe
[docs]class applyCl(BaseCli): # applyCl
blurb="Applies a function to all input elements across a Ray cluster" # applyCl
[docs] def __init__(self, f, prefetch=None, timeout=60, bs=1, rss:Union[dict, str]={}, pre:bool=False, num_cpus=1, num_gpus=0, memory=None, resolve=True, **kwargs): # applyCl
"""Like :class:`apply`, but execute a function over the input iterator
in multiple processes on multiple nodes inside of a cluster (hence "cl"). So, just a more
powerful version of :class:`applyMp`, assuming you have a cluster to run it on.
Example::
# returns [3, 2]
["abc", "de"] | applyCl(len) | deref()
# returns [5, 6, 9]
range(3) | applyCl(lambda x, bias: x**2+bias, bias=5) | deref()
# returns [[1, 2, 3], [1, 2, 3]], demonstrating outside vars work
someList = [1, 2, 3]
["abc", "de"] | applyCl(lambda s: someList) | deref()
nIds = applyCl.nodeIds()
# returns [[<nodeId1>, 0], [<nodeId2>, 1], ...], demonstrating preserve mode
[nIds, range(10)] | transpose() | applyCl(lambda x: x**2, pre=True) | deref()
# executes the function, but stores the result on remote nodes, instead of copying result to this node
a = range(5) | applyCl(lambda x: x**2, resolve=False) | deref()
# returns [0, 1, 4, 9, 16]
a | applyCl(lambda x: x) | deref()
Summary of all mode of operations::
# Data types:
# - 1: literal value, just a normal Python object
# - or1: ray.ObjectRef object - Ray's reference to a remote object living somewhere
# - h1: Handle object - k1lib's reference to a remote object, obtained if `resolve` is set to False. Use `h.get()` to
# - n1: node id, string
[1/or1/h1, 2/or2/h2] | applyCl(...) # returns [1, 2, 3]. "1/or1/h1" means that the input can be a list of literals, ObjectRef, or Handle
[1/or1/h1, 2/or2/h2] | applyCl(..., resolve=False) # returns [h1, h2, h3]
[[n1/h1, 1/or1/h3], [n2/h2, 2/or2/h4]] | applyCl(..., pre=True) # returns [[n1/h1, 1], [n2/h2, 2]], executed on n1/h1, h3 is copied over
[[n1/h1, 1/or1/h3], [n2/h2, 2/or2/h4]] | applyCl(..., pre=True, resolve=False) # returns [[n1/h1, h3], [n2/h2, h4]]
[n1, n2] | applyCl.aS(lambda: ...) # returns [[n1, 1], [n2, 2]]
None | applyCl.aS(lambda: ...) # returns [[n1, 1], [n2, 2], ...], executes once on all nodes
[n1, n2] | applyCl.aS(lambda: ..., resolve=False) # returns [[n1, h1], [n2, h2]]
Internally, this uses the library Ray (https://www.ray.io) to do the heavy
lifting. So, :class:`applyCl` can be thought of as a thin wrapper around that
library, but still has the same consistent interface as :class:`apply` and
:class:`applyMp`. From all of my tests so far, it seems that :class:`applyCl`
works quite well and is quite robust, so if you have access to a cluster, use
it over :class:`applyMp`.
The library will connect to a Ray cluster automatically when you import
everything using ``from k1lib.imports import *``. It will execute
``import ray; ray.init()``, which is quite simple. If you have ray installed,
but does not want this default behavior, you can do this::
import k1lib
k1lib.settings.startup.init_ray = False
from k1lib.imports import *
As with :class:`applyMp`, there are pitfalls and weird quirks to multiprocessing,
on 1 or multiple nodes, so check out the docs over there to be aware of them,
as those translates well to here.
There're more extensive documentation on these notebooks: `27-multi-node <https://mlexps.com/other/27-multi-node/>`_,
`30-applyCl-benchmarks <https://mlexps.com/other/30-applyCl-benchmarks/>`_, if you want to kinda get the feel of this
tool more.
.. admonition:: Time sharing the cluster
Let's say that the cluster is located in a company, and that multiple users want to
access it, then you might have to think about it a little more. Say the cluster has
60 cores, and someone has launched a long-running job: 2160 tasks, 10 minutes/task,
1 core/task, totalling 6 hours. If you want to launch another job that has 20 tasks,
requiring 10 cores, 1 second/task, totalling 2 seconds on an idle cluster.
All modern schedulers (Ray, Slurm, Spark, etc) can't schedule your 20 tasks immediately.
It has to wait for some running tasks to finish to schedule your task. This means you
have to wait on average for 5-10 minutes before all of your tasks finish. This might be
fine if you've used Slurm a lot, but extremely not okay for me and my patience. The whole
point of a cluster is to get results immediately, within a few seconds. So here's a
workaround::
# long running task, on notebook 1
from k1lib.imports import *
settings.cli.applyCl.cpuLimit = 40
range(2160) | applyCl(lambda x: time.sleep(10*60)) | ignore() # long running task
# short running task, on notebook 2
from k1lib.imports import *
range(20) | applyCl(lambda x: time.sleep(1)) | ignore() # short running task, should finishes almost immediately
Essentially, there's that setting that you can adjust. Like with Ray's ``num_cpus``,
this is merely a suggestion to my library to not schedule jobs past that cpu limit,
but you can circumvent it in some strange edge cases that I'm too lazy to implement.
Likewise, when you schedule a Ray task, you can specify that it will only take 1 cpu,
but you can end up forking it into 5 different processes, which can cause congestion
and memory thrashing. If Ray doesn't do it right (possibly impossible to do so anyway)
then do I really have to?
.. admonition:: Advanced use case
Not really advanced, but just a bit difficult to understand/follow. Let's say
that you want to scan through the home directory of all nodes, grab all files,
read them, and get the number of bytes they have. You can do something like this::
a = None | applyCl.aS(lambda: None | cmd("ls ~") | filt(os.path.isfile) | deref()) | deref()
b = a | ungroup() | deref()
c = b | applyCl(cat(text=False) | shape(0), pre=True) | deref()
d = c | groupBy(0, True) | apply(item().all() | toSum(), 1) | deref()
Noted, this is relatively complex. Let's see what A, B, C and D looks like::
# A
[['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', ['Miniconda3-latest-Linux-x86_64.sh', 'mintupgrade-2023-04-01T232950.log']],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', ['5a', 'abc.jpg', 'a.txt']]]
# B
[['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 'Miniconda3-latest-Linux-x86_64.sh'],
['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 'mintupgrade-2023-04-01T232950.log'],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', '5a'],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 'abc.jpg'],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 'a.txt']]
# C
[['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 74403966],
['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 1065252],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 2601],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 16341],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 10177]]
# D
[['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 92185432],
['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 75469218]]
The steps we're concerned with is A and C. In step A, we're running 2 processes, 1 for each
node, to get all the file names in the home directory. In step C, we're running 5 processes
total, 2 on the first node and 3 on the second node. For each process, it's going to read as
bytes and count up those bytes. Finally in step D, the results are grouped together and the
sizes summed.
So yeah, it's pretty nice that we did all of that in a relatively short amount of code.
The data is distributed too (reading multiple files from multiple nodes), so we're truly
not bottlenecked by anything.
.. admonition:: Context object handle
Let's say you have these unresolved handles::
# creates a bunch of infinite random number generators, one on each node
its = None | applyCl.aS(lambda: repeatF(lambda: random.randint(2, 15)), resolve=False) | deref()
# gets the next value of all generators, can return [4, 4, 9, 2] for a 4-node cluster
its | cut(1) | applyCl(lambda x: next(x)) | deref()
# gets the next value of all generators, add 0 to the 1st generator, 1 to the 2nd generator, etc, then return the resulting output that might look like [3, 16, 10, 7]
[its | cut(1), range(4)] | transpose() | applyCl(lambda x: next(ctxHandle) + x, pre=True) | cut(1) | deref()
So, the special thing about this is that variable `ctxHandle` on the last line. That is a
special variable that is injected on the way. Why all this complexity?
The whole idea with unresolved object handles is that you have a distributed complex
data structure that can't be serialized and juggle around easily. That's the `its` handles
in the example. Then, you might want to feed in some (simple, serializable) input X,
change the complex data structure in its own process, then return some (simple, serializable)
output Y. In the example, X is range(4), while Y is the resulting number array.
.. admonition:: Cython
Even with running everything distributedly like this, you might run into speed issues.
Then, you'll essentially have 2 options. First is to write (pleasant) Cython code, or
second is to write (unpleasant) C/C++ Python extensions. If you were to choose the C/C++
option, then here's the flow:
- Develop Python C extension, export everything as a shared library (a single .so file)
- Execute ``applyCl.installSo("library.so")`` to install the library to all nodes
- Use functions provided by your library normally, like ``import yourlibrary; range(10) | applyCl(yourlibrary.afunction) | deref()``
But applyCl can deal with cython functions directly in your notebook. Here's the flow:
- Annotate a code cell with the magic "%%cython", write Cython code as usual
- Just use that function normally
Let's see an example::
# ---------- code cell 1 ----------
from k1lib.imports import * # cython ipython extension is automatically loaded
# ---------- code cell 2 ----------
%%cython
from k1lib.cli import ls # demonstrating that you can use all of the existing tools and libraries as usual
cdef g(a:int): return f"{a} 123" # demonstrating that you can refactor out to other functions
def f (a:int): return [g(a), ls(".")]
# ---------- code cell 3 ----------
range(10) | applyCl(f) | deref()
You only have to install Cython on the current node and not the other nodes. Also note
that currently, this only supports you passing in Cython-compiled functions directly into
``applyCl()`` or ``applyCl.aS()``. You can't pass a normal Python function that uses a
Cython function like this::
# ---------- code cell 1 ----------
from k1lib.imports import *
# ---------- code cell 2 ----------
%%cython
from k1lib.cli import ls # note: have to reimport here because all the symbols inside this code block is independent from the rest of the notebook
cpdef g(a:int): return f"{a} 123"
# ---------- code cell 3 ----------
def f (a:int): return [g(a), ls(".")]
range(10) | applyCl(f) | deref() # this throws an import error, as the compiled code won't be installed on the remote nodes
This behavior can potentially be fixed in the future, but I'm lazy and it's not a hard
thing to follow the rules. The dynamic library will be installed in the working directory.
You can delete them after a coding session to free up some space, but they're likely to be
tiny, so you don't really have to worry about it.
Also, like everything else in parallel programming, please benchmark absolutely everything
because it might even be slower using Cython if internally you're allocating space for
large data structures constantly, compared to cli tool's lazy execution model. For operations
that work on giant files, I actually find it very difficult to gain any appreciable speedups
using Cython, as cli tools are already pretty optimized, so best task for this is probably
long-running, complex mathematical modelling, and not generic text manipulation.
.. warning::
Just like with any other parallel processing model, there are some quirks that
can happen behind the scenes that aren't quite what you expected, as this is
incredibly tricky. Dig into Ray's serialization page (https://docs.ray.io/en/latest/ray-core/objects/serialization.html)
or their whitepapers (https://docs.ray.io/en/latest/ray-contribute/whitepaper.html)
to get a feel for how it all works underneath. The notable quirks that you might need to think about is:
- A lot of the internal code assumes that you're on a Unix system, preferably Linux,
so it might not work on other platforms like Windows. But honestly, screw Windows.
:param prefetch: if not specified, schedules all jobs at the same time. If
specified, schedules jobs so that there'll only be a specified amount of
jobs, and will only schedule more if results are actually being used.
:param timeout: seconds to wait for job before raising an error
:param bs: if specified, groups ``bs`` number of transforms into 1 job to be more
efficient.
:param rss: resources required for the task. Can be {"custom_resource1": 2} or "custom_resource1" as a shortcut
:param pre: "preserve", same convention as :meth:`applyCl.aS`. If True, then allow passing
through node ids as the first column to shedule jobs on those specific nodes only
:param num_cpus: how many cpu does each task take?
:param memory: how much memory to give to the task in bytes?
:param resolve: whether to resolve the outputs or not. Set this to False to not move
memory to the requesting node and cache the big data structure on the remote node
:param kwargs: extra arguments to be passed to the function. ``args`` not
included as there're a couple of options you can pass for this cli.""" # applyCl
super().__init__(fs=[f]); _fC = fastF(f); self.ogF = f; self.pre = pre; self.num_cpus = num_cpus # applyCl
try: # f might be a string, so can't do f.__module__ # applyCl
isCythonFunc = "cython" in f.__module__ # applyCl
if isCythonFunc: applyCl.installSo(sys.modules[f.__module__].__file__) # applyCl
except: isCythonFunc = False # applyCl
self.rss = rss = {rss: 1} if isinstance(rss, str) else rss # applyCl
cwd = os.getcwd(); se = exportSe(k1lib.settings) # applyCl
def remoteF(s, e, idxCtxHandle=None): # function that will be executed on remote node. Have to setup environment a little bit before executing # applyCl
# e: the real element. s is just e's storage context, in case e is a Handle. Else s is not used and can be None. Why? Because Actors can't be # applyCl
# serialized directly with cloudpickle, but it can be passed as function parameters, which Ray will do some special sauce serialization # applyCl
import k1lib; movePropsSe(se, k1lib.settings) # do this to sync current settings with the remote worker nodes # applyCl
if k1lib.settings.startup.or_patch.numpy: k1lib.cli.init.patchNumpy() # applyCl
if k1lib.settings.startup.or_patch.dict: k1lib.cli.init.patchDict() # applyCl
import os; os.makedirs(cwd, exist_ok = True); os.chdir(cwd) # applyCl
if isinstance(e, Handle): e.setStorage(s); e = e.get() # applyCl
if idxCtxHandle: _fC.__globals__["ctxHandle"] = s.d[idxCtxHandle] # applyCl
return _fC(e, **kwargs) # applyCl
# self.remoteF = remoteF; f = ray.remote(resources=rss, num_cpus=num_cpus, **({"memory": memory} if memory else {}))(remoteF) # applyCl
rssKw = {"resources": rss, "num_cpus": num_cpus, "num_gpus": num_gpus, **({"memory": memory} if memory else {})}#; rssKw = None # applyCl
self.prefetch = prefetch or int(1e8); self.timeout = timeout; self.bs = bs # applyCl
self._copyCtx = lambda: [f, [prefetch, timeout, bs, rss, pre, num_cpus, num_gpus, memory, resolve], kwargs] # applyCl
nodeId = applyCl.nodeId(); rssF = ray.remote(**rssKw)(remoteF) # f that has constraints injected into it # applyCl
def preprocessF(e): # return Handle (if pre=False), or [nodeId, Handle] (if pre=True). f is remoteF, core element can be a Handle, real object, or ObjectRef # applyCl
if resolve: # storage location managed by ray, returns or2/h2 or [nId/h1, or2/h2] # applyCl
if pre: # applyCl
a, b = e; s = extractStorage(b) # applyCl
if isinstance(a, Handle): h = a.deposit(b); return [a, h.executeAsync(remoteF, rssKw, a.idx)], [a,h] # applyCl
else: return [a, specificNode(rssF, a, num_gpus=num_gpus).remote(s, b)], [] # applyCl
else: # applyCl
if isinstance(e, Handle): return e.executeAsync(remoteF, rssKw), [e] # applyCl
else: return rssF.remote(None, e), [] # applyCl
else: # storage location explicitly managed by me, returns h2 or [nId/h1, h2] # applyCl
storageWarmup() # applyCl
if pre: # applyCl
a, b = e; s = extractStorage(b) # applyCl
if isinstance(a, Handle): # [h1, 2/or2/h2] # applyCl
h = a.deposit(b) # first deposits b into a's storage context to get handle. h and a have the same storage context # applyCl
a.report("report7"); b.report("report7") # applyCl
return [a, h.executeAsync(remoteF, rssKw, a.idx)], [a,b,h] # then executes it in h's storage context # applyCl
else: # [nId, 2/or2/h2] # applyCl
if isinstance(b, Handle) and b.nodeId == nodeId: return [a, b.executeAsync(remoteF, rssKw)], [b] # [nId, h2], if h2 is on nId, then use h2's storage context # applyCl
h = Handle.create(b, a, num_gpus=num_gpus); return [a, h.executeAsync(remoteF, rssKw)], [h] # create storage context on `a`, deposits b on it, then execute # applyCl
else: # 1/or1/h1 # applyCl
s = extractStorage(e) # applyCl
if isinstance(e, Handle): return e.executeAsync(remoteF, rssKw), [e] # has storage context already, execute on it directly # applyCl
else: h = Handle.create(e, num_gpus=num_gpus); return h.executeAsync(remoteF, rssKw), [h] # create storage context, deposit e on it, then execute # applyCl
@ray.remote # applyCl
def resolveFRemote(o): return 1 # applyCl
def resolveF(e): # applyCl
e = e[0] # why do this? Because of a pretty obscure bug related to the reference counting mechanism I have here # applyCl
# in preprocessF(), it returns [meats, Python references to (potential) Handles] # applyCl
# I have to do this in order to keep the Handles alive. After resolveF(), everything is settled, so # applyCl
# old Handles can safely be deleted # applyCl
if resolve: # applyCl
if pre: a, b = e; return [a, b.block().get() if isinstance(b, Handle) else ray.get(b)] # applyCl
else: return e.block().get() if isinstance(e, Handle) else ray.get(e) # applyCl
else: # applyCl
return [e[0], e[1].block()] if pre else e.block() # don't resolve to this node, but still block execution until that object is resolvable # applyCl
self.preprocessF = preprocessF; self.resolveF = resolveF; applyCl.preprocessF = preprocessF; applyCl.resolveF = resolveF # references for lprun so that I can benchmark these 2 functions # applyCl
[docs] @staticmethod # applyCl
def installSo(fn:str, force:bool=False): # applyCl
"""Installs dynamic library (.so file) to all nodes.
:param fn: file name of the shared library
:param force: force reinstall even if the library is already on the remote node""" # applyCl
basename = os.path.basename(fn) # applyCl
if not force and basename in _applyCl_soCache: return # applyCl
print("Installing dynamic library to all nodes... ", end=""); _applyCl_soCache.add(basename); contents = cli.cat(fn, False) # applyCl
None | applyCl.aS(lambda: contents | cli.file(basename)) | cli.ignore(); print("Done") # applyCl
[docs] def __ror__(self, it): # applyCl
timeout = self.timeout; bs = self.bs; ogF = self.ogF; preprocessF = self.preprocessF; resolveF = self.resolveF # applyCl
if bs > 1: return it | cli.batched(bs, True) | applyCl(lambda x: x | apply(ogF) | cli.aS(list), self.prefetch, timeout) | cli.joinStreams() # applyCl
def gen(it): # applyCl
futures = deque(); it = iter(it); n = self.num_cpus; limit = settings.applyCl.cpuLimit or int(1e9) # applyCl
for i, e in zip(range(min(self.prefetch, (limit-_cpuUsed.value)//n)), it): # try to anticipate how much resources can be consumed ahead of time and only schedule that much, to prevent deadlocks when multiple applyCl() is called, but their parent process has not consumed the yield statement, so the cpu count doesn't get decremented # applyCl
while _cpuUsed.value + n > limit: time.sleep(0.1) # this is a very rudimentary lock. Doesn't have to be accurate though, and Python's GIL ensure atomic-ness # applyCl
futures.append(preprocessF(e)); _cpuUsed.value += n # applyCl
for e in it: yield resolveF(futures.popleft()); futures.append(preprocessF(e)) # free and allocate cpu slot immediately, so no while loop necessary # applyCl
for e in futures: res = resolveF(e); _cpuUsed.value -= n; yield res # applyCl
applyCl.rorGen = gen # applyCl
return gen(it) # applyCl
[docs] def __invert__(self): # applyCl
"""Expands the arguments out, just like :class:`apply`.
Example::
# returns [20, 20, 18, 14, 8, 0, -10, -22, -36, -52]
[range(10), range(20, 30)] | transpose() | ~applyCl(lambda x, y: y-x**2) | deref()""" # applyCl
f, rest, kwargs = self._copyCtx(); return applyCl(lambda x: f(*x), *rest, **kwargs) # applyCl
[docs] @staticmethod # applyCl
def nodeIds(includeSelf=True) -> List[str]: # applyCl
"""Returns a list of all node ids in the current cluster.
Example::
applyCl.nodeIds() # returns something like ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', '1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068']
If you want to get nodes' metadata, then just use ray's builtin function ``ray.nodes()``
:param includeSelf: whether to include node id of the current process or not""" # applyCl
res = ray.nodes() | cli.filt(lambda x: x["Alive"]) | apply(lambda x: x["NodeID"]) | aS(list) # applyCl
if includeSelf: return res # applyCl
res.remove(applyCl.nodeId()); return res # applyCl
[docs] @staticmethod # applyCl
@lru_cache # applyCl
def nodeId() -> str: # applyCl
"""Returns current node id""" # applyCl
return ray.runtime_context.get_runtime_context().get_node_id() # applyCl
[docs] @staticmethod # applyCl
def cpu() -> int: # applyCl
"""Grabs the number of cpus available on this node""" # applyCl
return int(applyCl.meta()["Resources"]["CPU"]) # applyCl
[docs] @staticmethod # applyCl
def aS(f, **kwargs): # applyCl
"""Executes function f once for all node ids that are piped in.
Example::
# returns [['1051da...', ['Desktop', 'Downloads']], ['7bb387...', ['Pictures', 'Music']]]
applyCl.nodeIds() | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref()
# same as above, demonstrating passing in a list of nodeIds
["1051da...", "7bb387..."] | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref()
# same as above, demonstrating passing in "None" for all nodeIds in the cluster
None | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref()
If you want to execute f for all nodes, you can pass in None instead.
As a reminder, this kinda follows the same logic as the popular cli :class:`aS`, where
f is executed once, hence the name "apply Single". Here, the meaning of "single" is
different. It just means execute once for each node ids. If you want to quickly execute
a function on a single node, without all the fuss, there's this short form that you can follow::
# returns ['Desktop', 'Downloads'], demonstrating that you can also pass in a single node id
"1051da..." | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref()
:param f: main function to execute in each node. Not supposed to accept any arguments
:param kwargs: keyword arguments for the main :class:`applyCl` function""" # applyCl
f = fastF(f); final = apply(lambda x: [x, 0]) | applyCl(lambda _: f(), pre=True, **kwargs) # applyCl
def inner(it): # applyCl
shortform = False # applyCl
if it is None: it = applyCl.nodeIds() # applyCl
if isinstance(it, str): it = [it]; shortform = True # applyCl
if it | ~cli.inSet(_nodeIdsCache()) | cli.shape(0) > 0: # caching nodeIds(), because that takes a surprising amount of time # applyCl
_nodeIdsCache.value = applyCl.nodeIds(); outliers = it | ~cli.inSet(_nodeIdsCache()) | cli.deref() # applyCl
if len(outliers) > 0: raise Exception(f"These nodes cannot be found: {outliers}") # applyCl
return it | final | ((cli.cut(1) | cli.item()) if shortform else cli.iden()) # applyCl
return aS(inner) # applyCl
[docs] @staticmethod # applyCl
def cmd(s:str, sudo=False, nodeIds=None, **kwargs): # applyCl
"""Convenience function to execute shell command on all nodes.
Example::
applyCl.cmd("mkdir -p /some/folder")
It returns [[nodeid1, output1], [nodeid2, output2]]. If you need more flexibility,
fall back to :meth:`applyCl.aS`
:param s: shell command to execute
:param sudo: if True, will execute the command with sudo privileges. Will ask for password
and then cache it internally for 5 minutes
:param kwargs: keyword arguments to pass to :class:`applyCl`""" # applyCl
global _password; import getpass # applyCl
if sudo: # applyCl
if _password() is None: # applyCl
print("Enter password:"); _password.value = getpass.getpass(prompt="") # applyCl
return nodeIds | applyCl.aS(lambda: _password() | cli.cmd(f"sudo -S {s}") | cli.deref(), **kwargs) | cli.deref() # applyCl
else: return nodeIds | applyCl.aS(lambda: None | cli.cmd(s) | cli.deref(), **kwargs) | cli.deref() # applyCl
[docs] @staticmethod # applyCl
def lookup(): # applyCl
"""Tries to lookup a particular file to see on which node it's at.
Example::
# returns [[nodeId, "something.txt"], [nodeId, "abc.jpg"]]
["something.txt", "abc.jpg"] | applyCl.lookup()
# returns [nodeId, "something.txt"]
"something.txt" | applyCl.lookup()
Files that don't exist won't be included in the result, and files that
exist on multiple nodes will be returned multiple times. The output format
is such that I can pipe it into applyCl(..., pre=True) and have it execute
some function that I want. This is pretty much just a convenience function.""" # applyCl
def inner(fns): # applyCl
fns = fns | cli.deref(); single = isinstance(fns, str) # applyCl
if single: fns = [fns] # applyCl
ans = None | applyCl.aS(lambda: fns | cli.iden() & (apply(os.path.expanduser) | apply(os.path.exists)) | cli.transpose() | cli.filt("x", 1) | cli.cut(0) | cli.deref()) | cli.ungroup() | cli.deref() # applyCl
return ans[0] if single else ans # applyCl
return cli.aS(inner) # applyCl
[docs] @staticmethod # applyCl
def replicateFile(fn:str, nodeIds=None): # applyCl
"""Replicates a specific file in the current node to all the other nodes.
Example::
applyCl.replicateFile("~/cron.log")
Internally, this will read chunks of 100kB of the specified file and dump it
incrementally to all other nodes, which has implications on performance. To
increase or decrease it, check out :class:`~k1lib.cli.inp.cat`. This also means
you can replicate arbitrarily large files around as long as you have the disk
space for it, while ram size doesn't really matter
Please note that this operation is not symmetric. Unlike :meth:`balanceFile` and
:meth:`balanceFolder`, in which they can be invoke on any node and it'll roughly
do the same thing (rebalances everything out), this operation can do totally
different things depending on which node you run it on. Let's say the file exists
on nodes A and B, but not on nodes C and D. If you run this function on either
node A or B, it will replicate the file to C and D. However, if you run this
function on node C or D, it will instead throw an error since the file doesn't
exist.
:param fn: file name""" # applyCl
fn = os.path.expanduser(fn); dirname = os.path.dirname(fn) # applyCl
# checking if there's an existing file already. If there is, then don't try to copy data to that node # applyCl
if nodeIds is None: canSize = os.path.getsize(fn); nodeIds = None | applyCl.aS(lambda: os.path.getsize(fn) if os.path.exists(fn) else 0) | cli.filt(cli.op() != canSize, 1) | cli.cut(0) | cli.deref() # applyCl
nodeIds = nodeIds | cli.wrapList().all() | cli.deref() # applyCl
nodeIds | cli.insert(None, False).all() | applyCl(lambda _: None | cli.cmd(f"mkdir -p {dirname}; rm -rf {fn}") | cli.deref(), pre=True) | cli.deref() # applyCl
for chunk in cli.cat(fn, text=False, chunks=True): nodeIds | cli.insert(chunk, False).all() | applyCl(lambda chunk: chunk >> cli.file(fn) | cli.deref(), pre=True) | cli.deref() # applyCl
[docs] @staticmethod # applyCl
def balanceFile(fn:str, nAs:List[str]=None, nBs:List[str]=None, rS=None, chunkSize:int=100_000_000): # applyCl
"""Splits a specified file in node nAs and dumps other parts
to nodes nBs. Example::
applyCl.balanceFile("~/cron.log")
This will split the big files up into multiple segments (1 for each node). Then
for each segment, it will read through it chunk by chunk into memory, and then
deposits it into the respective nodes. Finally, it truncates the original files
down to its segment boundary.
The main goal of this is so that you can analyze a single big (say 200GB) file
quickly. If that file is on a single node, then it will take forever, even with
:class:`applyMp`. So splitting things up on multiple nodes will make analyzing
it a lot faster.
There's also the function :meth:`balanceFolder`, which has the opposite problem of
having lots of small (say 100MB) files. So it will try to move files around (keeping
them intact in the meantime) to different nodes so that the folder size ratio is
roughly proportional to the cpu count.
The exact split rule depends on the number of CPUs of each node. Best to see an
example::
Command: applyCl.balanceFile("~/cron.log")
Verbose command: applyCl.balanceFile("~/cron.log", ["1"], ["1", "2", "3", "4", "5"])
----------- Before -----------
Node: 1 2 3 4 5
Cpu: 8 12 16 8 8
Size (GB): 52 0 0 0 0
----------- After -----------
Node: 1 2 3 4 5
Cpu: 8 12 16 8 8
Size (GB): 8 12 16 8 8
This also works if you have files on existing nodes already, and are upgrading the
cluster::
Command: applyCl.balanceFile("~/cron.log")
Verbose command: applyCl.balanceFile("~/cron.log", ["1", "5"], ["1", "2", "3", "4", "5"])
----------- Before -----------
Node: 1 2 3 4 5
Cpu: 8 12 16 8 8
Size (GB): 26 0 0 26 0
----------- After -----------
Node: 1 2 3 4 5
Cpu: 8 12 16 8 8
Size (GB): 8 12 16 8 8
If you want to move files out of a node when decommissioning them, you can do
something like this::
Command: applyCl.decommission("~/cron.log", ["3", "4"])
Verbose command: applyCl.balanceFile("~/cron.log", ["1", "2", "3", "4", "5"], ["1", "2", "5"])
----------- Before -----------
Node: 1 2 3 4 5
Cpu: 8 12 16 8 8
Size (GB): 8 12 16 8 8
----------- After -----------
Node: 1 2 3 4 5
Cpu: 8 12 16 8 8
Size (GB): 15 22 0 0 15
Remember that the node ids "1", etc. is for illustrative purposes only. You should get
real node ids from :meth:`nodeIds`.
Why is the file size proportional to the number of cores on each node? Well, if
you have more cores, you should be able to process more, so as to make everything
balanced, right?
Again, this means that you can split arbitrarily large files as long as you have
the disk space for it, ram size is not a concern. How does this perform? Not
the best in the world if you don't have a lot of nodes. With sata 3 ssds, 750MB/s
ethernet, I got transfer speeds of roughly 100MB/s. This should increase as you
have more nodes based on the code structure, but I haven't tested it yet. Can
it be faster? Definitely. Am I willing to spend time optimizing it? No.
:param fn: file name
:param nAs: node ids that currently stores the file. If not specified, try to detect
what nodes the file exists in
:param nBs: node ids that will store the file after balancing everything out. If not
specified, will take all available nodes
:param rS: :class:`~k1lib.cli.inp.refineSeek` instance, if you need more fine-grained
control over section boundaries so as to not make everything corrupted
:param chunkSize: see :meth:`balanceFolder`
""" # applyCl
from k1lib.cli._applyCl import balanceFile # applyCl
with settings.cat.context(chunkSize=chunkSize): balanceFile(fn, nAs, nBs, rS) # applyCl
[docs] @staticmethod # applyCl
def decommissionFile(fn, nAs:List[str], rS=None, chunkSize:int=100_000_000): # applyCl
"""Convenience function for :meth:`balanceFile`. See docs over there.""" # applyCl
from k1lib.cli._applyCl import balanceFile # applyCl
with settings.cat.context(chunkSize=chunkSize): balanceFile(fn, None, applyCl.nodeIds() | ~cli.inSet(nAs) | cli.deref(), rS) # applyCl
[docs] @staticmethod # applyCl
def cat(fn:str=None, f:Callable=None, nodeIds=None, timeout:float=60, pre:bool=False, multiplier:int=1, includeId:bool=False, resolve:bool=True): # applyCl
"""Reads a file distributedly, does some operation on them, collects and
returns all of the data together. Example::
fn = "~/repos/labs/k1lib/k1lib/cli/test/applyCl.cat.data"
("0123456789"*5 + "\\n") * 1000 | file(fn)
applyCl.splitFile(fn)
applyCl.cat(fn, shape(0), keepNodeIds=True) | deref()
That returns something like this (for a 2-node cluster, with 2 (node A) and 4 (node B) cpus respectively)::
[['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 167],
['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 167],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 166],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 167],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 166],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 167]]
Here, we're creating an initial file with 1000 lines. Then we'll split it up into
2 fragments: 334 lines and 667 lines and store them on the respective nodes. Then,
on node A, we'll split the file up into 2 parts, each with 167 lines. On node B,
we'll split the file up into 4 parts, each with around 166 lines. Then we'll
schedule 6 processes total, each dealing with 166 lines. After all of that, results
are collected together and returned.
If you want to distinguish between different processes inside f, for example you
want to write results into different files, you can do something like this::
dir_ = "~/repos/labs/k1lib/k1lib/cli/test"
fn = f"{dir_}/applyCl.cat.data"
applyCl.cmd(f"rm -r {dir_}/applyCl") # clear out old folders
applyCl.cmd(f"mkdir -p {dir_}/applyCl") # creating folders
# do processing on fn distributedly, then dump results into multiple files
applyCl.cat(fn, ~aS(lambda idx, lines: lines | shape(0) | aS(dill.dumps) | file(f"{dir_}/applyCl/{idx}.pth")), includeId=True) | deref()
# reading all files and summing them together
None | applyCl.aS(lambda: ls(f"{dir_}/applyCl")) | ungroup() | applyCl(cat(text=False) | aS(dill.loads), pre=True) | cut(1) | toSum()
.. admonition:: Simple mode
There's also another mode that's activated whenever f is not specified that feels
more like vanilla :class:`~k1lib.cli.inp.cat`. Say you have a file on a specific node::
nodeId = "7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d"
fn = "~/ssd2/randomFile.txt"
# -------------- file is on current node --------------
cat(fn) # returns iterator of lines inside the file
fn | cat() # same thing as above
# -------------- file is on remote node --------------
[nodeId, fn] | applyCl.cat() # returns iterator of lines of the file
applyCl.cat([nodeId, fn]) # same thing
nodeId | applyCl.cat(fn) # also same thing
So yeah, there're lots of ways to just simply read a file on a remote node. Is
it too much? Probably, but good thing is that you can pick any that's intuitive
for you. Note that this mode is just for convenience only, for when you want to do
exploratory analysis on a single remote file. To be efficient at bulk processing,
use the normal mode instead.
:param fn: file name
:param f: function to execute in every process
:param nodeIds: only read file from these nodes
:param timeout: kills the processes if it takes longer than this amount of seconds
:param pre: "preserve" mode, just like in :class:`applyCl`. Whether to keep the node id column or not
:param multiplier: by default, each node will spawn as many process as there
are cpus. Sometimes you want to spawn more process, change this to a higher number
:param includeId: includes a unique id for this process (just normal integers from 0 to n)
:param resolve: whether to resolve the remote objects or not
""" # applyCl
fn = os.path.expanduser(fn) if fn is not None else None # applyCl
if f is None: # simple case # applyCl
def inner(nodeId_fn:Tuple[str, str]): # applyCl
nodeId, fn = nodeId_fn; seeks = [nodeId] | applyCl.aS(lambda: fn | cli.splitSeek(round(os.path.getsize(fn)/settings.cat.chunkSize+1))) | cli.cut(1) | cli.item() | cli.deref() # applyCl
inter = seeks | cli.window(2) | apply(cli.wrapList() | cli.insert(nodeId)) | cli.deref() # applyCl
return inter | ~applyCl(lambda sB, eB: cli.cat(fn,sB=sB,eB=eB) | cli.deref(), pre=True) | cli.cut(1) | cli.joinStreams() # applyCl
# return [nodeId_fn] | applyCl(cat() | deref(), pre=True) | cut(1) | item() # direct, no chunking method # applyCl
if fn is None: return aS(inner) # [nodeId, fn] | applyCl.cat() # applyCl
if isinstance(fn, str): return aS(lambda nodeId: inner([nodeId, fn])) # nodeId | applyCl.cat() # applyCl
else: return inner(fn) # applyCl.cat([nodeId, fn]) # applyCl
nodeIds = nodeIds or (applyCl.nodeIds() | applyCl.aS(lambda: os.path.exists(fn)) | cli.filt(cli.op(), 1) | cli.cut(0) | cli.deref()) # applyCl
checkpoints = nodeIds | applyCl.aS(lambda: fn | cli.splitSeek(int(applyCl.meta()["Resources"]["CPU"]*multiplier)) | cli.window(2) | cli.deref()) | cli.ungroup() | cli.insertIdColumn(True, False) | ~apply(lambda x,y,z: [x,[*y,z]]) | cli.deref() # applyCl
return checkpoints | applyCl(~aS(lambda x,y,idx: cli.cat(fn, sB=x, eB=y) | ((cli.wrapList() | cli.insert(idx)) if includeId else cli.iden()) | f), pre=True, timeout=timeout, num_cpus=1, resolve=resolve) | (cli.iden() if pre else cli.cut(1)) # applyCl
[docs] @staticmethod # applyCl
def replicateFolder(folder:str, nodeIds=None): # applyCl
"""Replicates a specific folder in the current node to all the other nodes.
Example::
applyCl.replicateFolder("~/ssd2/data/owl")
This just list out all files recursively in the specified folder, then replicate each file using :meth:`replicateFile`""" # applyCl
applyCl.getFilesInFolder(folder) | applyCl(lambda fn: applyCl.replicateFile(fn, nodeIds), num_cpus=0.1) | cli.deref() # applyCl
[docs] @staticmethod # applyCl
def balanceFolder(folder:str, maxSteps:int=None, audit:bool=False, bs:int=5, chunkSize:int=100_000_000): # applyCl
"""Balances all files within a folder across all nodes.
Example::
# make the chunk size huge so that transfers become faster
settings.cli.cat.chunkSize = 100_000_000
base = "~/repos/labs/k1lib/k1lib/cli/test/applyCl.balance"
# deletes old structures and making test folder
applyCl.cmd(f"rm -r {base}"); applyCl.cmd(f"mkdir -p {base}")
# creates 20 files of different sizes and dump it in the base folder of the current node
torch.linspace(1e4, 1e5, 20).int() | apply(lambda x: "x"*x) | insertIdColumn() | ~apply(lambda idx, contents: contents | file(f"{base}/{idx}.txt")) | deref();
# transfers files between nodes such that the total folder size is proportional to the number of cpus across nodes
applyCl.balanceFolder(base)
# get folder size of all nodes
None | applyCl.aS(lambda: ls(base) | apply(os.path.getsize) | toSum()) | deref()
# creates 20 additional files and dump it to the current node
torch.linspace(1e4, 1e5, 20).int() | apply(lambda x: "x"*x) | insertIdColumn() | ~apply(lambda idx, contents: contents | file(f"{base}/{idx+20}.txt")) | deref();
# balances the tree out again
applyCl.balance(base)
# get folder size of all nodes
None | applyCl.aS(lambda: ls(base) | apply(os.path.getsize) | toSum()) | deref()
So imagine that you just downloaded 1000 files to a single node on a specific folder,
but you need to analyze all of them in a distributed manner. What you can do is to
move some files to other nodes and then do your analysis. If you want to download
more files, just dump it to any node (or download distributed across all nodes),
then rebalance the folders and do your analysis.
Also, internally, it splits files into multiple chunks, transfer the chunks to other
nodes and append to the correct files. It uses :meth:`~k1lib.cli.inp.cat` to split up
the file, which has settings under ``settings.cli.cat``. By default, the chunk size is
100k bytes, which I think is the sweet spot because :meth:`~k1lib.cli.inp.cat` also
supports remote file accessed from the internet and sometimes the library is used for
systems with very few memory. But for this use case where you already have the insane
hardware for this, 100kB is extremely small and will slow transfer rates to a crawl,
so in this function, it will be temporarily be set to the parameter ``ChunkSize``, which
is 100MB by default.
:param folder: folder to rebalance all of the files
:param maxSteps: what's the maximum number of file transfers? By default has no limit, so that files are transferred until
:param audit: if True, don't actually move files around and just return what files are going to be moved where
:param bs: batch size for transporting this many files at once. Increase to make it faster, but with the
penalty of the progress bar not updating as frequently
:param chunkSize: file chunk size to split up and send to other nodes
""" # applyCl
from k1lib.cli._applyCl import balanceFolder # applyCl
with settings.cat.context(chunkSize=chunkSize): return balanceFolder(folder, audit, maxSteps, bs=bs) # applyCl
[docs] def decommissionFolder(folder:str, nAs:List[str], maxSteps:int=10000, audit:bool=False, timeout:float=3600, bs:int=5, chunkSize:int=100_000_000): # applyCl
"""Like :meth:`decommissionFile`, but works for distributed folders instead.
:param nAs: list of node ids to migrate files away from
:param maxSteps: limits the total number of optimization steps. Normally don't have to specify,
but just here in case it runs for too long trying to optimize the folder structure
:param audit: if True, just returns the file movements it's planning to do
:param bs: batch size for transporting this many files at once. Increase to make it faster, but with the
penalty of the progress bar not updating as frequently
:param chunkSize: see :meth:`balanceFolder`
""" # applyCl
from k1lib.cli._applyCl import decommissionFolder # applyCl
with settings.cat.context(chunkSize=chunkSize): return decommissionFolder(folder, nAs, audit=audit, maxSteps=maxSteps, timeout=timeout, bs=bs) # applyCl
[docs] @staticmethod # applyCl
def pruneFolder(folder): # applyCl
"""Removes empty directories recursively from a root folder.""" # applyCl
def inner(folder): # applyCl
folder = os.path.expanduser(folder) # applyCl
dirs, files = folder | ls() | filt(os.path.isdir).split() | deref() # applyCl
if len(files) > 0: return # applyCl
dirs | apply(pruneFolder) | ignore() # applyCl
if folder | ls() | shape(0) == 0: None | cmd(f"rm -rf {folder}") | ignore() # applyCl
None | applyCl.aS(lambda: inner(folder)) | deref() # applyCl
[docs] @staticmethod # applyCl
def diskScan(folder:str, raw=False, accurate=True, f=None): # applyCl
"""Scans for files and folders in the specified folder for potential
distributed files and folders. A distributed file is a file that exists on more
than 1 node. A distributed folder is a folder that that exists on more than 1
node and does not have any shared children. Example::
applyCl.diskScan("~/ssd2")
applyCl.diskScan("~/ssd2", True)
The first line does not return anything, but will print out something like this:
.. include:: ../literals/diskScan.rst
While the second line will return a parseable data structure instead::
[[['/home/kelvin/ssd2/data/genome/RegulationFeatureActivity', [4113489746, 7912834090, 4164314316]],
['/home/kelvin/ssd2/data/genome/go/release_geneontology_org', [2071645117, 4172737915, 2107005131]],
['/home/kelvin/ssd2/data/genome/RegulationFeatureActivity.backup', [568878496, 552888466, 600610083]],
['/home/kelvin/ssd2/data/genome/00-common_all.idx', [341738564, 671136833, 0]],
['/home/kelvin/ssd2/data/genome/genbank/ch1.dat.gz', [25356744, 0, 25356764]],
['/home/kelvin/ssd2/test', [136152, 273530, 136351]],
['/home/kelvin/ssd2/data/genome/genbank/ch1', [0, 0, 0]]],
[['/home/kelvin/ssd2/data/genome/dummy.txt', [1101, 1101, 1101]]],
[['/home/kelvin/ssd2/data/genome/00-All.vcf', [32737509360, 65475018903, 32737509588]],
['/home/kelvin/ssd2/data/genome/MotifFeatures/homo_sapiens.GRCh38.motif_features.gff', [13963854962, 27927709895, 13963854962]],
['/home/kelvin/ssd2/data/genome/00-common_all.vcf', [2353901811, 4707803470, 2353901831]]]]
Remember that since an operating system usually have lots of shared files
(like "~/.bashrc", for example), these might be mistaken as a distributed file.
Make sure to only scan folders that you store data in, or else it'll take a long time to return.
:param folder: the folder to scan through
:param raw: whether to return raw data or display it out nicely
:param accurate: if True, returns size when you read all files into RAM. If False
returns size occupied by the entire file/folder (will be larger because files
are arranged into different blocks in the underlying disk)
:param f: optional post process function applied after getting the raw results, if ``raw=False``""" # applyCl
from k1lib.cli._applyCl import diskScan4, diskScan5 # applyCl
if raw: return diskScan4(folder, accurate=accurate) # applyCl
else: return diskScan5(folder, accurate=accurate, f=(f or cli.iden())) # applyCl
[docs] @staticmethod # applyCl
def balancedNodeIds(): # applyCl
"""Returns a stream of node ids that's balanced based on cpu count/performance.
Example::
# returns list of 10 node ids: ["abc...", "def...", "abc...", ...]
applyCl.balancedNodeIds() | head() | deref()
""" # applyCl
from k1lib.cli._applyCl import balancedNodeIds # applyCl
return balancedNodeIds() # applyCl
[docs] @staticmethod # applyCl
def balancedCpus(): # applyCl
"""Returns Dict[nodeId (str) -> #cpu (int))]. Could be useful to know
how much to split up files and folders according to your custom rules. Example::
# returns {"abc...": 8, "def...": 19, "ghi...": 88} for 7700 (4c8t), 10700k (8c16t) and 13900k (24c32t)
applyCl.balancedCpus()
""" # applyCl
from k1lib.cli._applyCl import loadTestGuard # applyCl
return loadTestGuard() # applyCl
[docs] @staticmethod # applyCl
def loadTest(): # applyCl
"""Performs a load test on the cluster.
Example::
applyCl.loadTest()
What is a load test? It basically tries to perform some intensive and
long-running calculations on all processes on all nodes in the cluster
to know how good are each individual nodes. This is useful information
because whenever you try to split a file up to form a distributed file,
or move files in a folder around to form a distributed folder, the amount
of data each node gets is going to be proportional to this performance
information. More powerful nodes will have more data to process, so that
the total running time across all nodes is going to roughly be the same.
But isn't cpu count good enough for this? No, not actually. The i7 7700
has 4 cores, 8 threads, and the i9 13900k has 8 performance cores and 16
efficiency cores, totalling to 32 threads. You would suspect that the
13900k to be 4x (32/8=4) or 6x (24/4=6) more powerful than the 7700, but
it's actually 10x more powerful.
The test itself takes around 1-2 minutes, and the test results are going
to be saved locally in the folder "~/.k1lib/", so that it can use that
info directly in future runs.""" # applyCl
from k1lib.cli._applyCl import loadTest # applyCl
return loadTest() # applyCl
[docs] @staticmethod # applyCl
def getFolderSize(folder:str=None) -> int: # applyCl
"""Shortcut function to get size of a folder on the current node.""" # applyCl
from k1lib.cli._applyCl import getFolderSize # applyCl
if folder is None: return getFolderSize # applyCl
return folder | getFolderSize # applyCl
[docs] @staticmethod # applyCl
def getFilesInFolder(folder:str=None): # applyCl
from k1lib.cli._applyCl import getFilesInFolder # applyCl
if folder is None: return getFilesInFolder # applyCl
return folder | getFilesInFolder # applyCl
if hasRay: # applyCl
@ray.remote(num_cpus=0) # applyCl
class Storage: # a wrapper for specific objects, kinda like ObjectRef, but it's an ObjectRef in my control. Should not be serialized to every other place # applyCl
def __init__(self, v=None): # applyCl
self.lockExecute = threading.Lock(); self.lockIncref = threading.Lock(); self.lockDecref = threading.Lock() # applyCl
self.nodeId = applyCl.nodeId(); self.d = {}; self.refs = defaultdict(lambda: 0) # applyCl
self.autoInc = k1lib.AutoIncrement(prefix="_d") # applyCl
self.idx = f"{self.nodeId}_" + f"{random.random()}"[:10] + f"_{time.time()}" # applyCl
self.idx2 = f"{random.randint(100, 900)}" # applyCl
self.deposit(v) # applyCl
def getIdx(self): return self.idx # applyCl
def getMeta(self): return [self.nodeId, self.idx, self.idx2] # applyCl
def lookup(self, idx:str): return self.d[idx] # applyCl
def remove(self, idx:str): del self.d[idx] # applyCl
def keys(self): return list(self.d.keys()) # applyCl
def incref(self, idx:str): # applyCl
# with self.lockIncref: # applyCl
if idx is not None: # applyCl
# requests.get(f"http://192.168.1.133:8892/increfd/{self.idx2}/{idx}") # applyCl
if self.refs[idx] <= 0: raise Exception(f"incref-ing object {self.idx2}/{idx} that has already been deleted") # applyCl
self.refs[idx] += 1#; return self # applyCl
def decref(self, idx:str): # applyCl
# with self.lockDecref: # applyCl
if idx is not None: # applyCl
self.refs[idx] -= 1; v = self.d[idx] # applyCl
if self.refs[idx] == 0: self.remove(idx) # applyCl
if self.refs[idx] < 0: raise Exception(f"decref-ing object {self.idx2}/{idx} that no longer exists") # applyCl
# requests.get(f"http://192.168.1.133:8892/decrefd/{self.idx2}/{idx}")#; return self # applyCl
def deposit(self, v:"1/or1/h1", s:"Storage"=None) -> "idx:str": # injecting s into v if v is a Handle # applyCl
if isinstance(v, Handle): v.setStorage(s); v = v.get() # applyCl
if isinstance(v, ray.ObjectRef): v = ray.get(v) # applyCl
idx = self.autoInc() # applyCl
# requests.get(f"http://192.168.1.133:8892/deposit/{self.idx2}/{idx}/{v}"[:100]) # applyCl
self.d[idx] = v; self.refs[idx] += 1; return idx # applyCl
def execute(self, f, idx:str, idxCtxHandle:str=None) -> "idx:str": # applyCl
# requests.get(f"http://192.168.1.133:8892/execute/{self.idx2}/{idx}") # applyCl
# if idxCtxHandle: f.__globals__["ctxHandle"] = self.d[idxCtxHandle] # injecting in global variable # applyCl
with self.lockExecute: return self.deposit(f(self, self.d[idx], idxCtxHandle)) # executing "pure" f with some argument taken from the storage # applyCl
def __getstate__(self): raise Exception("Can't be serialized!") # applyCl
else: # applyCl
class Storage: pass # applyCl
_storages = {} # nodeId -> {idx: int, ss: [Storage]}, idx for current index to yield the storage # applyCl
_cpuCounts = {} # nodeId -> int # applyCl
_nodeIdsGen = None # applyCl
def getStorage(nodeId:str=None, num_gpus=0) -> Storage: # getStorage
"""Handles creating storage contexts. This is created mainly because it's
costly to actually instantiate a new actor (1-2 second/actor!), so this will
spawn new storage contexts till the cpu count of each node is reached. From
then on, it will keep reusing old storage contexts""" # getStorage
global _nodeIdsGen; _nodeIdsGen = _nodeIdsGen or applyCl.balancedNodeIds() # getStorage
nodeId = nodeId or next(_nodeIdsGen) # getStorage
if num_gpus == 0: # getStorage
if nodeId not in _storages or nodeId not in _cpuCounts: # getStorage
_storages[nodeId] = {"idx": 0, "ss": []} # getStorage
_cpuCounts[nodeId] = nodeId | applyCl.aS(lambda: os.cpu_count()) # getStorage
if len(_storages[nodeId]["ss"]) < _cpuCounts[nodeId]: # add new storage context # getStorage
_storages[nodeId]["ss"].append(specificNode(Storage, nodeId).remote()) # getStorage
idx = _storages[nodeId]["idx"]; res = _storages[nodeId]["ss"][idx] # getStorage
_storages[nodeId]["idx"] = (idx+1)%_cpuCounts[nodeId]; return res # getStorage
else: return specificNode(Storage, nodeId, num_gpus).remote() # getStorage
def extractStorage(x): return x.storage if isinstance(x, Handle) else None # extractStorage
class Handle: # specific object in storage, pretty much ObjectRef that I'm in control of. Can be serialized to every other place # Handle
def __init__(self, storage, idx:str=None, _or=None): # Handle
self.storage = storage; self.idx = idx; self._or = _or # Handle
self.nodeId, self.storageId, self.idx2 = ray.get(storage.getMeta.remote()); self.weakref = False # whether this Handle should decrement reference count of storage element or not # Handle
@staticmethod # Handle
def create(v, nodeId:str=None, num_gpus=0) -> "Handle": # creates new storage and put value into it # Handle
s = getStorage(nodeId, num_gpus); return Handle(s, ray.get(s.deposit.remote(v, extractStorage(v)))) # Handle
def deposit(self, v) -> "Handle": # put new value into this handle's Storage # Handle
return Handle(self.storage, ray.get(self.storage.deposit.remote(v, extractStorage(v)))) # Handle
def execute(self, f, s:"Storage"=None, kwargs=None) -> "Handle": # f is a normal function. Blocks until finished executing # Handle
if kwargs: # Handle
kwargs.pop("num_gpus", None) # Handle
@ray.remote(**kwargs) # Handle
def inner(sto, s): return sto.execute.remote(f, self.idx) # Handle
return Handle(self.storage, ray.get(ray.get(inner.remote(self.storage, s)))) # Handle
else: return Handle(self.storage, ray.get(self.storage.execute.remote(f, self.idx))) # Handle
def block(self) -> "Handle": # block execution and finalize Handle's state # Handle
if self._or: self.idx = ray.get(self._or) # Handle
return self # Handle
def executeAsync(self, f, kwargs=None, idxCtxHandle:str=None) -> "Handle": # f is a normal function. Returns immediately, finalize it by calling .block() # Handle
# requests.get(f"https://logs.mlexps.com/async12/{self.idx}/N/{self.idx2}") # Handle
if kwargs: # idxCtxHandle is the (optional) string of the background handle object (left arg in pre=True mode), to be dynamically injected into the "ctxHandle" global variable # Handle
# i1 = self.idx; i2 = self.idx2 # Handle
kwargs.pop("num_gpus", None) # Handle
@ray.remote(**kwargs) # Handle
def inner(sto): # Handle
# requests.get(f"https://logs.mlexps.com/async56/{i1}/N/{i2}") # Handle
return ray.get(sto.execute.remote(f, self.idx, idxCtxHandle)) # Handle
# requests.get(f"https://logs.mlexps.com/async34/{self.idx}/N/{self.idx2}") # Handle
return Handle(self.storage, _or=inner.remote(self.storage)) # Handle
else: return Handle(self.storage, _or=self.storage.execute.remote(f, self.idx, idxCtxHandle)) # Handle
def get(self): return ray.get(self.storage.lookup.remote(self.idx)) # Handle
def setStorage(self, s): # inject storage dependency into this handle, increment storage's refcount # Handle
if s: self.storage = s; self.storage.incref.remote(self.idx); self.weakref = False # Handle
def __repr__(self): return f"<Handle idx={self.idx} storage={self.storageId}>" # Handle
def __getstate__(self): d = dict(self.__dict__); d["storage"] = None; return d # Handle
def __setstate__(self, d): self.__dict__.update(d); self.weakref = True # reconstructed Handles don't decrement reference count of variable, because Actors can't be serialized # Handle
# def report(self, s): requests.get(f"http://192.168.1.133:8892/{s}/{self.idx2}/{self.idx}") # Handle
def __del__(self): # Handle
# requests.get(f"http://192.168.1.133:8892/handdel/{self.idx2}/{self.idx}") # Handle
# print(f"storage: {self.storage}, nodeId: {self.nodeId}") # Handle
if not self.weakref: self.storage.decref.remote(self.idx) # Handle
@lru_cache # Handle
def storageWarmup(): print("Warming up distributed storage..."); None | applyCl.aS(lambda: os.cpu_count()) | ~apply(lambda x,y: [x]*y) | cli.joinSt() | apply(getStorage) | cli.ignore(); print("Finished warming up") # storageWarmup
def storageSize(): return _storages.values() | cli.op()["ss"].all() | cli.joinSt() | apply(lambda s: ray.get(s.keys.remote())) | cli.joinSt() | cli.shape(0) # storageSize
thEmptySentinel = object() # storageSize
[docs]class applyTh(BaseCli): # applyTh
blurb="Applies a function to all input elements across multiple threads" # applyTh
[docs] def __init__(self, f, prefetch:int=None, timeout:float=5, bs:int=1, sync=True, **kwargs): # applyTh
"""Kinda like the same as :class:`applyMp`, but executes ``f`` on multiple
threads, instead of on multiple processes. Advantages:
- Relatively low overhead for thread creation
- Fast, if ``f`` is io-bound
- Does not have to serialize and deserialize the result, meaning iterators can be
exchanged
Disadvantages:
- Still has thread creation overhead, so it's still recommended to specify ``bs``
- Is slow if ``f`` has to obtain the GIL to be able to do anything
All examples from :class:`applyMp` should work perfectly here.
:param prefetch: how many results to execute ahead of time
:param timeout: kills the thread if it takes longer than this amount
:param bs: how much to bunch function calls together
:param sync: if True, execute the functions, hang and wait for the result of each
operation, then return. Else schedules the thread but does not wait for the
result and yields None right away
:param kwargs: keyword arguments to be fed to the function""" # applyTh
fs = [f]; super().__init__(fs=fs); self.f = fs[0]; self.kwargs = kwargs # applyTh
self.prefetch = prefetch or int(1e9); self.timeout = timeout; self.bs = bs; self.sync = sync # applyTh
[docs] def __ror__(self, it): # applyTh
if self.bs > 1: # applyTh
yield from (it | cli.batched(self.bs, True) | applyTh(apply(self.f), self.prefetch, self.timeout, sync=self.sync) | cli.joinStreams()); return # applyTh
datas = deque(); it = iter(it); kwargs = self.kwargs; sync = self.sync # applyTh
innerF = fastF(self.f); timeout = self.timeout # applyTh
def f(line, wrapper): wrapper.value = innerF(line, **kwargs) # applyTh
for _, line in zip(range(self.prefetch), it): # applyTh
w = k1lib.Wrapper(thEmptySentinel) # applyTh
t = threading.Thread(target=f, args=(line,w)) # applyTh
t.start(); datas.append((t, w)) # applyTh
for line in it: # applyTh
data = datas.popleft() # pop before checking sync to clean up memory # applyTh
if sync: # applyTh
data[0].join(timeout) # applyTh
if data[1].value is thEmptySentinel: # applyTh
for data in datas: data[0].join(0.01) # applyTh
raise RuntimeError("Thread timed out!") # applyTh
yield data[1].value # applyTh
else: yield None # applyTh
w = k1lib.Wrapper(thEmptySentinel) # applyTh
t = threading.Thread(target=f, args=(line,w)) # applyTh
t.start(); datas.append((t, w)) # applyTh
for i in range(len(datas)): # do it this way so that python can remove threads early, due to ref counting # applyTh
data = datas.popleft() # applyTh
if sync: # applyTh
data[0].join(timeout) # applyTh
if data[1].value is thEmptySentinel: # applyTh
for data in datas: data[0].join(0.01) # applyTh
raise RuntimeError("Thread timed out!") # applyTh
yield data[1].value # applyTh
else: yield None # applyTh
def _copy(self): return applyTh(self.f, self.prefetch, self.timeout, self.bs, **self.kwargs) # applyTh
[docs] def __invert__(self): # applyTh
res = self._copy(); f = fastF(res.f) # applyTh
kw = res.kwargs # applyTh
res.f = lambda x: f(*x, **kw) # applyTh
res.kwargs = {} # applyTh
return res # applyTh
[docs]class applySerial(BaseCli): # applySerial
blurb="Applies a function to an element repeatedly" # applySerial
[docs] def __init__(self, f, *args, **kwargs): # applySerial
"""Applies a function repeatedly. First yields input iterator ``x``. Then
yields ``f(x)``, then ``f(f(x))``, then ``f(f(f(x)))`` and so on. Example::
# returns [2, 4, 8, 16, 32]
2 | applySerial(op()*2) | head(5) | deref()
If the result of your operation is an iterator, you might want to
:class:`~k1lib.cli.utils.deref` it, like this::
rs = iter(range(8)) | applySerial(rows()[::2])
# returns [0, 2, 4, 6]
rs | rows(1) | item() | deref()
# returns []. This is because all the elements are taken by the previous deref()
rs | item() | deref()
# returns [[2, 8], [10, -6], [4, 16], [20, -12]]
[2, 8] | ~applySerial(lambda a, b: (a + b, a - b)) | head(4) | deref()
rs = iter(range(8)) | applySerial(rows()[::2] | deref())
# returns [0, 2, 4, 6]
rs | rows(1) | item()
# returns [0, 4]
rs | item() # or `next(rs)`
# returns [0]
rs | item() # or `next(rs)`
:param f: function to apply repeatedly""" # applySerial
fs = [f]; super().__init__(fs=fs); self.f = fs[0] # applySerial
self.unpack = False; self.args = args; self.kwargs = kwargs # applySerial
[docs] def __ror__(self, it): # applySerial
f = fastF(self.f) # applySerial
if self.unpack: # applySerial
it = init.dfGuard(it) # applySerial
while True: yield it; it = f(*it, *self.args, **self.kwargs) # applySerial
else: # applySerial
while True: yield it; it = f(it, *self.args, **self.kwargs) # applySerial
[docs] def __invert__(self): # applySerial
ans = applySerial(self.f, *self.args, **self.kwargs) # applySerial
ans.unpack = True; return ans # applySerial
def argsort(it, key=None, reverse=False): # argsort
if isinstance(it, settings.arrayTypes): return np.argsort(it) # this mode ignores key and reverse! # argsort
if key: return sorted(range(len(it)), key=lambda i: key(it[i]), reverse=reverse) # argsort
else: return sorted(range(len(it)), key=it.__getitem__, reverse=reverse) # argsort
[docs]class sort(BaseCli): # sort
blurb="Sorts list/table based on an optional column" # sort
[docs] def __init__(self, column:int=0, numeric=True, reverse=False, unsort=False): # sort
"""Sorts list/table based on a specific `column`.
Example::
# returns [[5, 'a'], [1, 'b']]
[[1, "b"], [5, "a"]] | ~sort(0) | deref()
# returns [[2, 3]]
[[1, "b"], [5, "a"], [2, 3]] | ~sort(1) | deref()
# errors out, as you can't really compare str with int
[[1, "b"], [2, 3], [5, "a"]] | sort(1, False) | deref()
# returns [-1, 2, 3, 5, 8]
[2, 5, 3, -1, 8] | sort(None) | deref()
.. admonition:: unsort
This is how it works::
a = np.array([1, 5, 9, 2, 6, 3, 7, 4, 8])
# returns np.array([1, 5, 9, 2, 6, 3, 7, 4, 8])
a | sort(None, unsort=True)
# returns np.array([1, 2, 3, 4, 5, 6, 7, 8, 9]), normal sort
a | sort(None)
# returns np.array([-3.5, 0.5, 4.5, -2.5, 1.5, -1.5, 2.5, -0.5, 3.5]), sorts, do transformation, then unsort
a | (sort(None, unsort=True) | aS(lambda x: x - x[-1]/2))
# returns np.array([12.25, 0.25, 20.25, 6.25, 2.25, 2.25, 6.25, 0.25, 12.25])
a | (sort(None, unsort=True) | aS(lambda x: (x - x[-1]/2)**2))
How this works is that it will sort everything as usual, then it'll execute the captured
transformation and then it will unsort everything. This is for scenarios when an operation
needs to operate on sorted data, but you still want to keep the original ordering for some
reason.
:param column: if None, sort rows based on themselves and not an element
:param numeric: whether to convert column to float
:param reverse: False for smaller to bigger, True for bigger to smaller. Use
:meth:`__invert__` to quickly reverse the order instead of using this param
:param unsort: whether to sort and then unsort the input or not""" # sort
super().__init__(capture=True) # sort
self.column = column; self.reverse = reverse; self.numeric = numeric; self.unsort = unsort # sort
self.filterF = (lambda x: float(x)) if numeric else (lambda x: str(x)) # sort
def _all_array_opt(self, it, level): # sort
if self.unsort: return NotImplemented # too complex to think about right now # sort
c = self.column; reverse = self.reverse; p = [slice(None, None, None)]*level; p1 = (*p, slice(None, None, None)); ser = self.capturedSerial # sort
if c is None and len(it.shape)-level != 1: raise Exception(f"Expected sort(None) to take in 1-d array, but the array has shape {it.shape[level:]}") # sort
if c is not None and len(it.shape)-level != 2: raise Exception(f"Expected sort(None) to take in 2-d array, but the array has shape {it.shape[level:]}") # sort
bm = np if isinstance(it, np.ndarray) else (torch if (hasTorch and isinstance(it, torch.Tensor)) else None) # sort
if bm is not None: # sort
if c is None: b = bm.argsort(it); b = bm.flip(b, (level,)) if reverse else b; return bm.gather(it, level, b) | ser.all(level) # sort
else: b = bm.argsort(it[(*p1, c)]); b = bm.flip(b, (level,)) if reverse else b; return bm.gather(it, level, b[(*p1, None)].expand(it.shape)) | ser.all(level) # sort
return NotImplemented # sort
[docs] def __ror__(self, it:Iterator[str]): # sort
c = self.column; reverse = self.reverse; unsort = self.unsort; bm = None; ser = self.capturedSerial # sort
if hasPandas: # sort
if isinstance(it, pd.DataFrame): # sort
if c is None: it = init.dfGuard(it) # sort
else: # sort
s = it[list(it)[c]]; arg = s.argsort(); arg = (arg[::-1] if reverse else arg).to_numpy(); res = it.iloc[arg] | ser # sort
if unsort: return res.iloc[argsort(arg)] if isinstance(res, pd.DataFrame) else res | cli.rows(*argsort(arg)) # sort
else: return res # sort
elif isinstance(it, pd.core.arraylike.OpsMixin): it = it.to_numpy() # sort
if isinstance(it, settings.arrayTypes): # sort
if c is None and len(it.shape) != 1: raise Exception(f"Expected sort(None) to take in a 1-d array, but the array has shape {it.shape}") # sort
if c is not None and len(it.shape) != 2: raise Exception(f"Expected sort(col) to take in a 2-d array, but the array has shape {it.shape}") # sort
bm = np if isinstance(it, np.ndarray) else (torch if (hasTorch and isinstance(it, torch.Tensor)) else None) # sort
if bm: # sort
arg = bm.argsort(it) if c is None else bm.argsort(it[:,c]); arg = bm.flip(arg, (0,)) if reverse else arg # sort
return it[arg] | ser | (cli.rows(*argsort(arg)) if self.unsort else cli.iden()) # sort
f = self.filterF # sort
rows = list(it) if c is None else list((it | cli.isNumeric(c) if self.numeric else it) | cli.apply(list)) # sort
def sortF(row): # sort
if len(row) > c: return f(row[c]) # sort
return float("inf") # sort
if self.unsort: # sort
arg = argsort(rows, f if c is None else sortF, self.reverse) # sort
return rows | cli.rows(*arg) | ser | cli.rows(*argsort(arg)) # sort
return sorted(rows, key=f if c is None else sortF, reverse=self.reverse) | ser # sort
[docs] def __invert__(self): # sort
"""Creates a clone that has the opposite sort order""" # sort
return sort(self.column, self.numeric, not self.reverse, self.unsort) # sort
def _jsF(self, meta): # sort
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); argIdx = init._jsDAuto() # sort
if self.unsort: raise Exception("sort._jsF() unsort mode is unavailable, as the cli capture mechanism isn't possible in JS") # sort
return f"{fIdx} = ({dataIdx}) => {dataIdx}.ksort({cli.kjs.v(self.column)}, {cli.kjs.v(self.numeric)}, {cli.kjs.v(self.reverse)})", fIdx # sort
[docs]class sortF(BaseCli): # sortF
"Sorts list/table using a function" # sortF
[docs] def __init__(self, f:Callable[[Any], float], column:int=None, reverse=False): # sortF
"""Sorts list/table using a function.
Example::
# returns ['a', 'aa', 'aaa', 'aaaa', 'aaaaa']
["a", "aaa", "aaaaa", "aa", "aaaa"] | sortF(lambda r: len(r)) | deref()
# returns ['aaaaa', 'aaaa', 'aaa', 'aa', 'a']
["a", "aaa", "aaaaa", "aa", "aaaa"] | ~sortF(lambda r: len(r)) | deref()""" # sortF
fs = [f]; super().__init__(fs=fs); self.f = fs[0]; self._fC = fastF(self.f) # sortF
self.column = column; self.reverse = reverse # sortF
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # sortF
c = self.column; f = self._fC # sortF
if c is None: # sortF
it = init.dfGuard(it) # sortF
try: it[:]; len(it) # sortF
except: it = list(it) # sortF
return sorted(it, key=f, reverse=self.reverse) # sortF
if hasPandas and isinstance(it, pd.DataFrame): # sortF
col = it[list(it)[c]] # sortF
arg = [x[0] for x in sorted([[i, f(x)] for i,x in enumerate(col)], key=lambda x:x[1], reverse=self.reverse)] # sortF
return it.iloc[arg] # sortF
def sortF(row): # sortF
if len(row) > c: return f(row[c]) # sortF
return float("inf") # sortF
return sorted(list(it), key=sortF, reverse=self.reverse) # sortF
[docs] def __invert__(self) -> "sortF": # sortF
return sortF(self.f, self.column, not self.reverse) # sortF
def _jsF(self, meta): # sortF
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); argIdx = init._jsDAuto() # sortF
header, _fIdx, _async = k1lib.kast.asyncGuard(k1lib.kast.prepareFunc3(self.f, ("sortF", meta))) # sortF
return f"{header}\n{fIdx} = {'async ' if _async else ''}({dataIdx}) => {{ return {'await ' if _async else ''}{dataIdx}.sortF{'_async' if _async else ''}(({argIdx}) => {_fIdx}({argIdx}), {cli.kjs.v(self.column)}, {cli.kjs.v(self.reverse)}); }}", fIdx # sortF
[docs]class consume(BaseCli): # consume
blurb="Consumes the iterator in a side stream and returns the iterator" # consume
[docs] def __init__(self, f:Union[BaseCli, Callable[[Any], None]]): # consume
r"""Consumes the iterator in a side stream and returns the iterator.
Kinda like the bash command ``tee``. Example::
# prints "0\n1\n2" and returns [0, 1, 2]
range(3) | consume(headOut()) | toList()
# prints "range(0, 3)" and returns [0, 1, 2]
range(3) | consume(lambda it: print(it)) | toList()
This is useful whenever you want to mutate something, but don't want to
include the function result into the main stream.
See also: :class:`~k1lib.cli.output.tee`""" # consume
fs = [f]; super().__init__(fs=fs); self.f = fs[0] # consume
[docs] def __ror__(self, it): # consume
self.f(it); return it # consume
def batched_randperm(n, l, gen=None): # courtesy of https://discuss.pytorch.org/t/batch-version-of-torch-randperm/111121/3 # batched_randperm
if gen is None: return np.argsort(np.random.rand(l, n)) # batched_randperm
else: return torch.argsort(torch.rand(l, n, generator=gen), dim=-1) # batched_randperm
def randperm(n, gen=None): return batched_randperm(n, 1, gen)[0] # randperm
default = 100 # randperm
[docs]class randomize(BaseCli): # randomize
blurb="Randomizes the input elements' order" # randomize
[docs] def __init__(self, bs=default, seed=None): # randomize
"""Randomize input stream. In order to be efficient, this does not
convert the input iterator to a giant list and yield random values from that.
Instead, this fetches ``bs`` items at a time, randomizes them, returns and
fetch another ``bs`` items. If you want to do the giant list, then just pass
in ``float("inf")``, or ``None``. Example::
# returns [0, 1, 2, 3, 4], effectively no randomize at all
range(5) | randomize(1) | deref()
# returns something like this: [1, 0, 2, 3, 5, 4, 6, 8, 7, 9]. You can clearly see the batches
range(10) | randomize(3) | deref()
# returns something like this: [7, 0, 5, 2, 4, 9, 6, 3, 1, 8]
range(10) | randomize(float("inf")) | deref()
# same as above
range(10) | randomize(None) | deref()
# returns True, as the seed is the same
range(10) | randomize(seed=4) | deref() == range(10) | randomize(seed=4) | deref()
Note that if ``seed=True``, then it will randomize all input
iterators the same way and independently of each other. Meaning::
r = randomize(seed=42)
range(10) | r | deref() # returns [6, 9, 1, 2, 0, 8, 3, 5, 4, 7]
range(10) | r | deref() # also returns [6, 9, 1, 2, 0, 8, 3, 5, 4, 7]
This may or may not be desireable, but I think it's desirable.
:param bs: batch size
:param seed: if specified, will always randomize the input iterator in the same way""" # randomize
self.bs = bs if bs != None else float("inf") # randomize
self.seed = seed # randomize
if seed is not None and not hasTorch: raise Exception("Seeded randomize() depends on PyTorch. Please install it first") # randomize
def _newTorchGen(self): return torch.Generator().manual_seed(random.Random(self.seed).getrandbits(63)) # randomize
def _newGenn(self): # randomize
if self.seed is None: return randperm # randomize
gen = self._newTorchGen(); return lambda n: randperm(n, gen) # randomize
def _newGenn2(self): # randomize
if self.seed is None: return batched_randperm # randomize
gen = self._newTorchGen(); return lambda n, l: batched_randperm(n, l, gen) # randomize
def _all_array_opt(self, it, level): # randomize
perms = self._newGenn2()(it.shape[level], np.prod(it.shape[:level])) # randomize
b = it | cli.joinSt(level-1); return b[np.arange(len(b))[:, None], perms].reshape(it.shape) # randomize
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # randomize
bs = self.bs # randomize
if isinstance(it, settings.arrayTypes): # randomize
if bs is default or bs is None or len(it) <= bs: return it if len(it) == 1 else it[self._newGenn()(len(it))] # randomize
if hasPandas and isinstance(it, pd.DataFrame): # randomize
if bs is default or bs is None or len(it) <= bs: return it.iloc[self._newGenn()(len(it))] # randomize
if bs is default: bs = 100 # randomize
def gen(): # randomize
genn = self._newGenn() # randomize
for batch in it | cli.batched(bs, True): # randomize
batch = list(batch); perms = genn(len(batch)) # randomize
for idx in perms: yield batch[idx] # randomize
return gen() # randomize
def _jsF(self, meta): # randomize
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # randomize
return f"{fIdx} = ({dataIdx}) => {dataIdx}.randomize({cli.kjs.v(self.seed)})", fIdx # randomize
class StaggeredStream: # StaggeredStream
def __init__(self, stream:Iterator[Any], every:int): # StaggeredStream
"""Not intended to be instantiated by the end user. Use :class:`stagger`
instead.""" # StaggeredStream
self.stream = stream; self.every = every # StaggeredStream
def __iter__(self): # StaggeredStream
for i, v in zip(range(self.every), self.stream): yield v # StaggeredStream
def __len__(self): # StaggeredStream
"""Length of window (length of result if you were to deref it).""" # StaggeredStream
return self.every # StaggeredStream
[docs]class stagger(BaseCli): # stagger
blurb='Staggers input stream into multiple stream "windows" placed serially' # stagger
[docs] def __init__(self, every:int): # stagger
"""Staggers input stream into multiple stream "windows" placed serially. Best
explained with an example::
o = range(10) | stagger(3)
o | deref() # returns [0, 1, 2], 1st "window"
o | deref() # returns [3, 4, 5], 2nd "window"
o | deref() # returns [6, 7, 8]
o | deref() # returns [9]
o | deref() # returns []
This might be useful when you're constructing a data loader::
dataset = [range(20), range(30, 50)] | transpose()
dl = dataset | batched(3) | (transpose() | toTensor()).all() | stagger(4)
for epoch in range(3):
for xb, yb in dl: # looping over a window
print(epoch)
# then something like: model(xb)
The above code will print 6 lines. 4 of them is "0" (because we stagger every 4
batches), and xb's shape' will be (3,) (because we batched every 3 samples).
You should also keep in mind that this doesn't really change the property of the
stream itself. Essentially, treat these pairs of statement as being the same thing::
o = range(11, 100)
# both returns 11
o | stagger(20) | item()
o | item()
# both returns [11, 12, ..., 20]
o | head(10) | deref()
o | stagger(20) | head(10) | deref()
Lastly, multiple iterators might be getting values from the same stream window,
meaning::
o = range(11, 100) | stagger(10)
it1 = iter(o); it2 = iter(o)
next(it1) # returns 11
next(it2) # returns 12
This may or may not be desirable. Also this should be obvious, but I want to
mention this in case it's not clear to you.""" # stagger
self.every = int(every) # stagger
[docs] def __ror__(self, it:Iterator[Any]) -> StaggeredStream: # stagger
return StaggeredStream(iter(init.dfGuard(it)), self.every) # stagger
[docs] @staticmethod # stagger
def tv(every:int, ratio:float=0.8): # stagger
"""Convenience method to quickly stagger train and valid datasets.
Example::
# returns [[16], [4]]
[range(100)]*2 | stagger.tv(20) | shape().all() | deref()""" # stagger
return stagger(round(every*ratio)) + stagger(round(every*(1-ratio))) # stagger
compareOps = {"__lt__", "__le__", "__eq__", "__ne__", "__gt__", "__ge__"} # stagger
[docs]class op(k1lib.Absorber, BaseCli): # op
blurb="Shorthand for lambda functions" # op
[docs] def __init__(self): # op
"""Absorbs operations done on it and applies it on the stream. Based
on :class:`~k1lib.Absorber`. Example::
# returns 16
4 | op()**2
# returns 16, equivalent to the above
4 | aS(lambda x: x**2)
# returns [0, 1, 4, 9, 16]
range(5) | apply(op()**2) | deref()
# returns [0, 1, 4, 9, 16], equivalent to the above
range(5) | apply(lambda x: x**2) | deref()
Main advantage is that you don't have to waste keystrokes when you just want
to do a simple operation. How it works underneath is a little magical, so just
treat it as a blackbox. A more complex example::
t = torch.tensor([[1, 2, 3], [4, 5, 6.0]])
# returns [torch.tensor([[4., 5., 6., 7., 8., 9.]])]
[t] | (op() + 3).view(1, -1).all() | deref()
Basically, you can treat ``op()`` as the input tensor. Tbh, you
can do the same thing with this::
[t] | applyS(lambda t: (t+3).view(-1, 1)).all() | deref()
But that's kinda long and may not be obvious. This can be surprisingly resilient, as
you can still combine with other cli tools as usual, for example::
# returns [2, 3], demonstrating "&" operator
torch.randn(2, 3) | (op().shape & iden()) | deref() | item()
a = torch.tensor([[1, 2, 3], [7, 8, 9]])
# returns torch.tensor([4, 5, 6]), demonstrating "+" operator for clis and not clis
(a | op() + 3 + iden() | item() == torch.tensor([4, 5, 6])).all()
# returns [[3], [3]], demonstrating .all() and "|" serial chaining
torch.randn(2, 3) | (op().shape.all() | deref())
# returns [[8, 18], [9, 19]], demonstrating you can treat `op()` as a regular function
[range(10), range(10, 20)] | transpose() | filt(op() > 7, 0) | deref()
# returns [3, 4, 5, 6, 7, 8, 9], demonstrating bounds comparison
range(100) | filt(3 <= op() < 10) | deref()
This can only deal with simple operations only. For complex operations, resort
to the longer version ``aS(lambda x: ...)`` instead!
There are also operations that are difficult to achieve, like
``len(op())``, as Python is expecting an integer output, so
``op()`` can't exactly take over. Instead, you have to use :class:`aS`,
or do ``op().ab_len()``. Get a list of all of these special operations
in the source of :class:`~k1lib.Absorber`.
Performance-wise, in most cases, there are no degradation, so don't worry
about it. Everything is pretty much on par with native lambdas::
n = 10_000_000
# takes 1.48s
for i in range(n): i**2
# takes 1.89s, 1.28x worse than for loop
range(n) | apply(lambda x: x**2) | ignore()
# takes 1.86s, 1.26x worse than for loop
range(n) | apply(op()**2) | ignore()
# takes 1.86s
range(n) | (op()**2).all() | ignore()
More complex operations still retains the same speeds, as there's a JIT compiler embedded in::
# takes 2.15s
for i in range(n): (i**2-3)*0.1
# takes 2.53s, 1.18x worse than for loop
range(n) | apply(lambda x: (x**2-3)*0.1) | ignore()
# takes 2.46s, 1.14x worse than for loop
range(n) | apply((op()**2-3)*0.1) | ignore()
Reserved operations that are not absorbed are:
- all
- __ror__ (__or__ still works!)
- ab_solidify
- op_hint""" # op
super().__init__({"_hint": None}) # op
[docs] @staticmethod # op
def solidify(f): # op
"""Static equivalent of ``a.ab_solidify()``.
Example::
f = op()**2
f = op.solidify(f)
If ``f`` is not an ``op``, then just return it without doing anything to it""" # op
if f.__class__.__name__.split(".")[-1] == "op": f.ab_solidify() # op
return f # op
[docs] def __ror__(self, it): # op
return self.ab_operate(it) # op
def __or__(self, o): # op
if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__or__(o) # op
return super().__add__(o) # op
def __add__(self, o): # op
if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__add__(o) # op
return super().__add__(o) # op
def __and__(self, o): # op
if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__and__(o) # op
return super().__and__(o) # op
def __call__(self, *args, **kwargs): # op
if self._ab_solidified: return self.ab_operate(*args, **kwargs) # op
return super().__call__(*args, **kwargs) # op
def _typehint(self, inp): # op
return self._hint if self._hint is not None else tAny() # op
[docs] def op_hint(self, _hint): # op
"""Specify output type hint""" # op
self._ab_sentinel = True; self._hint = _hint # op
self._ab_sentinel = False; return self # op
def _jsF(self, meta): return cli.aS(self)._jsF(meta) # op
cli.op = op # op
[docs]class integrate(BaseCli): # integrate
blurb="Calculates the cumulative sum of the input" # integrate
[docs] def __init__(self, col=None, dt=1): # integrate
"""Integrates the input.
Example::
# returns [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]
range(10) | integrate() | deref()
# returns [0, 2, 6, 12, 20, 30, 42, 56, 72, 90]
range(10) | integrate(dt=2) | deref()
# returns [['a', 0], ['b', 1], ['c', 4], ['d', 10]]
[["a", 0], ["b", 1], ["c", 3], ["d", 6]] | integrate(1) | deref()
# returns [['a', 0], ['b', 2], ['c', 8], ['d', 20]]
[["a", 0], ["b", 1], ["c", 3], ["d", 6]] | integrate(1, dt=2) | deref()
:param col: column to integrate over
:param dt: optional step size, or delta time""" # integrate
self.col = col; self.dt = dt # integrate
[docs] def __ror__(self, it): # integrate
it = init.dfGuard(it) # integrate
if self.col is None: # integrate
if self.dt == 1: # integrate
s = 0 # integrate
for e in it: s += e; yield s # integrate
else: # integrate
dt = self.dt; s = 0 # integrate
for e in it: s += e*dt; yield s # integrate
else: # integrate
col = self.col # integrate
if self.dt == 1: # integrate
s = 0 # integrate
for e in it: s += e[col]; e[col] = s; yield e # integrate
else: # integrate
dt = self.dt; s = 0 # integrate
for e in it: s += e[col]*dt; e[col] = s; yield e # integrate
[docs]class roll(BaseCli): # roll
blurb="Rolls the input by some amount of shift" # roll
[docs] def __init__(self, shift:int): # roll
"""Rolls the input some amount of shift.
Example::
# returns [7, 8, 9, 0, 1, 2, 3, 4, 5, 6]
range(10) | roll(3)
:param shift: shift amount""" # roll
self.shift = shift # roll
def _all_array_opt(self, it, level): # roll
if isinstance(it, np.ndarray): return np.roll(it, self.shift, level) # roll
if hasTorch and isinstance(it, torch.Tensor): return torch.roll(it, self.shift, level) # roll
def _all_opt2(self): return NotImplemented # should not do this optimization, this is not dimension-agnostic! # roll
[docs] def __ror__(self, it): # roll
shift = self.shift # roll
if isinstance(it, np.ndarray): return np.roll(it, shift, 0) # roll
if hasTorch and isinstance(it, torch.Tensor): return torch.roll(it, shift, 0) # roll
if hasPandas: # roll
arg = np.roll(np.arange(len(it)), shift, 0) # roll
if isinstance(it, pd.DataFrame): return it.iloc[arg] # roll
if isinstance(it, pd.core.arraylike.OpsMixin): return it[arg] # roll
it = init.dfGuard(it) # roll
try: it[0]; len(it) # roll
except: it = list(it) # roll
return [*it[-shift:], *it[:-shift]] # roll
[docs]class clamp(BaseCli): # clamp
blurb="Clamps input list/array between 2 values" # clamp
[docs] def __init__(self, col=None, min=0, max=1, std=None): # clamp
"""Clamps input list/array between 2 values.
Example::
# returns [3, 3, 3, 3, 4, 5, 6, 7, 7, 7]
range(10) | clamp(None, 3, 7) | deref()
# clamps the 1st column (0-index!) between 0 and 2
np.random.randn(10, 3) | clamp(1, 0, 2)
This cli has 2 modes. Absolute mode and std mode. Absolute mode will
clamp between .min and .max, and is activated when .std is left alone
(aka None).
Std mode is activated when you specify .std value. Then, it will calculate
the mean and std of the incoming data and will auto calculate the min and
max values.
:param col: column to clamp""" # clamp
self.col = col; self.min = min; self.max = max; self.std = std # clamp
def _all_array_opt(self, it, level): # this was pretty painful, I have to admit! # clamp
n = len(it.shape); col = self.col; std = self.std # clamp
if col is None: # clamp
if self.std is not None: # clamp
it = np.copy(it) if isinstance(it, np.ndarray) else torch.clone(it) # clamp
b = it | cli.joinSt(n-level-1).all(level) # clamp
min_ = (b.mean(level) - std*b.std(level)) | cli.repeat(b.shape[-1]).all(level) # clamp
max_ = (b.mean(level) + std*b.std(level)) | cli.repeat(b.shape[-1]).all(level) # clamp
b[b < min_] = min_[b < min_]; b[b > max_] = max_[b > max_]; return b.reshape(it.shape) # clamp
else: # clamp
if isinstance(it, np.ndarray): return np.clip(it, self.min, self.max) # clamp
if hasTorch and isinstance(it, torch.Tensor): return torch.clamp(it, self.min, self.max) # clamp
else: # clamp
if self.std is not None: # clamp
it = np.copy(it) if isinstance(it, np.ndarray) else torch.clone(it) # clamp
b = it[(*[slice(None,None,None)]*level,col)] | cli.joinSt(n-level-2).all(level) # clamp
min_ = (b.mean(level) - std*b.std(level)) | cli.repeat(b.shape[-1]).all(level) # clamp
max_ = (b.mean(level) + std*b.std(level)) | cli.repeat(b.shape[-1]).all(level) # clamp
b[b < min_] = min_[b < min_]; b[b > max_] = max_[b > max_]; return it # clamp
else: # clamp
it = np.copy(it) if isinstance(it, np.ndarray) else torch.clone(it) # clamp
it[(*[slice(None,None,None)]*level,col)] = (np.clip if isinstance(it, np.ndarray) else torch.clamp)(it[(*[slice(None,None,None)]*level,col)], self.min, self.max); return it # clamp
return NotImplemented # clamp
[docs] def __ror__(self, it): # clamp
col = self.col; min_ = self.min; max_ = self.max; std = self.std # clamp
if isinstance(it, np.ndarray): # clamp
if col is None: # clamp
if std is None: return np.clip(it, min_, max_) # clamp
m = it.mean(); s = it.std(); return np.clip(it, m-s*std, m+s*std) # clamp
else: # clamp
a = np.copy(it); c = a[:,col] # clamp
if std is None: a[:,col] = np.clip(c, min_, max_) # clamp
else: m = c.mean(); s = c.std(); a[:,col] = np.clip(c, m-s*std, m+s*std) # clamp
return a # clamp
if hasTorch and isinstance(it, torch.Tensor): # clamp
if col is None: # clamp
if std is None: return torch.clamp(it, min_, max_) # clamp
m = it.mean(); s = it.std(); return torch.clamp(it, m-s*std, m+s*std) # clamp
else: # clamp
a = torch.clone(it); c = a[:,col] # clamp
if std is None: a[:,col] = torch.clamp(c, min_, max_) # clamp
else: m = c.mean(); s = c.std(); a[:,col] = torch.clamp(c, m-s*std, m+s*std) # clamp
return a # clamp
if hasPandas and isinstance(it, pd.DataFrame): # clamp
if col is None: it = init.dfGuard(it) # clamp
else: # clamp
c = it[list(it)[col]] # clamp
if std is None: c = np.clip(c, min_, max_) # clamp
else: m = c.mean(); s = c.std(); c = np.clip(c, m-s*std, m+s*std) # clamp
return it.replaceCol(list(it)[col], c) # clamp
return it | cli.apply(lambda x: max(min(x, max_), min_), self.col) # TODO: needs fixing # clamp