Source code for k1lib.cli.modifier

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for quick modifiers, think of them as changing formats, or modifying an
array in place
"""
__all__ = ["applyS", "aS", "iterDelay", "apply", "applyMp", "parallel", "applyCl",
           "applyTh", "applySerial",
           "sort", "sortF", "consume", "randomize", "stagger", "op",
           "integrate", "roll", "clamp"]
from typing import Callable, Iterator, Any, Union, List, Tuple
from k1lib.cli.init import patchDefaultDelim, BaseCli, fastF; import k1lib.cli.init as init
import k1lib.cli as cli, numpy as np, threading, gc, random, k1lib
from collections import deque, defaultdict; from functools import partial, update_wrapper, lru_cache
from k1lib.cli.typehint import *; requests = k1lib.dep.requests
import dill, pickle, json, k1lib, warnings, atexit, signal, time, os, random, sys
try: import torch; import torch.multiprocessing as mp; hasTorch = True
except: import multiprocessing as mp; hasTorch = False
try: import ray; hasRay = True
except: hasRay = False
try: import pandas as pd; pd.core; hasPandas = True
except: hasPandas = False
settings = k1lib.settings.cli
[docs]class applyS(BaseCli): # applyS blurb="Applies a function to the input in pipe style" # applyS
[docs] def __init__(self, f:Callable[[Any], Any], *args, **kwargs): # applyS """Like :class:`apply`, but much simpler, just operating on the entire input object, essentially. The "S" stands for "single". There's also an alias shorthand for this called :class:`aS`. Example:: # returns 5 3 | aS(lambda x: x+2) Like :class:`apply`, you can also use this as a decorator like this:: @aS def f(x):n return x+2 # returns 5 3 | f This also decorates the returned object so that it has same qualname, docstring and whatnot. .. admonition:: Shorthands Writing out "lambda x:" all the time is annoying, and there are ways to quickly say ``lambda x: x+2`` like so:: 3 | op()+2 # returns 5 3 | aS("x+2") # returns 5. Behind the scenes, it compiles and execute `lambda x: x+2` The first way is to use :class:`op`, that will absorb all operations done on it, like "+", and returns a function that essentially replays all the operations. In the second way, you only have to pass in the string containing code that you want done on the variable "x". Then internally, it will compile to regular Python code. In fact, you can pass in ``op()`` or just a string to any cli that accepts any kind of function, like :class:`~k1lib.cli.filt.filt` or :class:`apply`:: range(4) | apply("x-2") | deref() range(4) | apply(op()-2) | deref() range(4) | filt("x%2") | deref() range(4) | filt(op()%2) | deref() :param f: the function to be executed :param kwargs: other keyword arguments to pass to the function, together with ``args``""" # applyS super().__init__(fs=[f]); self.args = args; self.kwargs = kwargs # applyS self.f = f; self._fC = fastF(f); update_wrapper(self, f, updated=()) # applyS self.inverted = False; self.preInvAS = None # applyS
def _typehint(self, inp): # applyS if self.hasHint: return self._hint # applyS try: return self.f._typehint(inp) # applyS except: return tAny() # applyS
[docs] def __ror__(self, it:Any) -> Any: return self._fC(it, *self.args, **self.kwargs) # applyS
def _all_array_opt(self, it, level): # applyS res = None; ogShape = tuple(it.shape[:level]) # applyS if isinstance(it, np.ndarray): # applyS n = len(it.shape); out = self._fC(np.transpose(it, (*range(level, n), *range(level))), *self.args, **self.kwargs); outN = len(out.shape) # applyS res = np.transpose(out, (*range(outN - level, outN), *range(outN - level))) # applyS if hasTorch and isinstance(it, torch.Tensor): # applyS n = len(it.shape); out = self._fC(torch.transpose_axes(it, (*range(level, n), *range(level))), *self.args, **self.kwargs); outN = len(out.shape) # applyS res = torch.transpose_axes(out, (*range(outN - level, outN), *range(outN - level))) # applyS if res is None: return NotImplemented # applyS elif ogShape != tuple(res.shape[:level]): return NotImplemented # applyS else: return res # applyS
[docs] def __invert__(self): # applyS """Configures it so that it expand the arguments out. Example:: # returns 5 [2, 3] | ~aS(lambda x, y: x + y) def f(x, y, a=4): return x*y + a # returns 10 [2, 3] | ~aS(f) # returns 11 [2, 3] | ~aS(f, a=5)""" # applyS if self.inverted: raise Exception("Doesn't support __invert__()ing multiple times") # applyS f = self._fC; a = self.args; kw = self.kwargs; res = aS(lambda x: f(*init.dfGuard(x), *a, **kw)); # applyS res.inverted = True; res.preInvAS = self; return res # applyS
def _jsF(self, meta): # applyS # if len(self.kwargs) > 0: raise Exception("JS does not have the concept of keyword arguments") # applyS # if len(self.args) > 0: raise Exception("aS._jsF() doesn't support *args yet") # applyS fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); argIdx = init._jsDAuto(); inverted = False # applyS if self.inverted: self = self.preInvAS; inverted = True # applyS # lookup for custom _jsF() functions # applyS header, _fIdx, _async = k1lib.kast.asyncGuard(k1lib.kast.prepareFunc3(self.f, ("aS", meta), self.kwargs, self.args)) # applyS # TODO: might want to inject args right here, on the JS side, instead of on the Python side # applyS if inverted: return f"{header}\n{fIdx} = {'async ' if _async else ''}({dataIdx}) => {{ return {'await ' if _async else ''}{dataIdx}.aSInv{'_async' if _async else ''}({_fIdx}); }}", fIdx # applyS else: return header, _fIdx # applyS
# for x,y in self.pattern | grep("\ue157", sep=True).till("\ue239") | cli.apply(cli.join("")) | cli.filt("x") | cli.apply(lambda x: [x, x.replace("\ue157", "${").replace("\ue239", "}")]): p = p.replace(x, y) # applyS # return f"const {fIdx} = ({dataIdx}) => {dataIdx}.grep(`{p}`)", fIdx # applyS aS = applyS # applyS
[docs]class iterDelay(BaseCli): # iterDelay
[docs] def __init__(self, n=5): # iterDelay """Iterates through the array, but make sure that there's a bit of a delay. Example:: # returns [0, 1, 2, 3, 4, 5, 6, 7, 8. 9] range(10) | iterDelay(5) | deref() At first glance, it's not obvious what's going on. Internally, :class:`iterDelay` will fetch 5 items from the input iterator right away and stores them in an internal deque (double-ended queue). Then on the 6th fetch, it will start yielding elements from the deque, hence introducing a delay while iterating through the input. I'd admit that this is pretty niche. I normally would not need such a cli when doing normal analysis, but some clis internally need this, like :class:`apply` when it detects that there's an _all_opt() optimization. Another use case is that prefetch feature of :class:`applyTh`, :class:`applyMp`, :class:`applyCl`""" # iterDelay self.n = n # iterDelay
[docs] def __ror__(self, it): # iterDelay if self.n <= 0: return it # iterDelay sentinel = object(); q = deque(); it = iter(init.dfGuard(it)) # iterDelay for i in range(self.n): # iterDelay e = next(it, sentinel) # iterDelay if e is sentinel: break # iterDelay q.append(e) # iterDelay def g(): # iterDelay while True: # iterDelay e = next(it, sentinel) # iterDelay if e is sentinel: break # iterDelay q.append(e); yield q.popleft() # iterDelay while len(q) > 0: yield q.popleft() # iterDelay return g() # iterDelay
# this section of code is a bit magical. The internal dynamics are so damn chaotic, but somehow, they all # iterDelay # work together beautifully # iterDelay def _allOpt_gen(a, q, n:int): # a is a complex, not deref-ed structure. Returns flattened a # _allOpt_gen if n == 0: yield a; return # _allOpt_gen c = 0; idx = len(q); q.append(None) # _allOpt_gen for e in a: c += 1; yield from _allOpt_gen(e, q, n-1) # _allOpt_gen q[idx] = c # _allOpt_gen def _allOpt_genIr(a, n) -> "(flattened structure, ir, depth)": # a is a complex, not deref-ed structure # _allOpt_genIr q = deque(); return _allOpt_gen(a, q, n), q, n # _allOpt_genIr def _allOpt_recover(b:Iterator["data_structure"], ir:"Deque[int]", irIdx:Iterator[int], n): # assumes b and ir are iterators # _allOpt_recover l = next(irIdx) # _allOpt_recover for i in range(ir[l]): yield list(_allOpt_recover(b, ir, irIdx, n-1)) if n-1 > 0 else next(b) # _allOpt_recover def _allOpt_recover(b:Iterator["data_structure"], ir:"Deque[int]", irIdx:Iterator[int], n): # assumes b and ir are iterators # _allOpt_recover l = next(irIdx); c = 0 # _allOpt_recover while True: # _allOpt_recover if ir[l] != None and ir[l] <= c: break # _allOpt_recover c += 1; yield list(_allOpt_recover(b, ir, irIdx, n-1)) if n-1 > 0 else next(b) # that list() right there is quite crucial! # _allOpt_recover def infIter(): # infIter i = 0; # infIter while True: yield i; i += 1 # infIter
[docs]class apply(BaseCli): # apply blurb="Applies a function to all input elements" # apply
[docs] def __init__(self, f:Callable[[Any], Any], column:Union[int, List[int]]=None, cache:int=0, **kwargs): # apply """Applies a function f to every element in the incoming list/iterator. Example:: # returns [0, 1, 4, 9, 16] range(5) | apply(lambda x: x**2) | deref() # returns [[3.0, 1.0, 1.0], [3.0, 1.0, 1.0]], running the function on the 0th column torch.ones(2, 3) | apply(lambda x: x+2, 0) | deref() # returns [[0, -1, 2, 3, -4], [2, -3, 4, 5, -6], [0, -1, 4, 9, -16]], running the function on the 1st (0-index btw) and 4th columns [[0, 1, 2, 3, 4], [2, 3, 4, 5, 6], [0, 1, 4, 9, 16]] | apply(lambda x: -x, [1, 4]) | deref() You can also use this as a decorator, like this:: @apply def f(x): return x**2 # returns [0, 1, 4, 9, 16] range(5) | f | deref() You can also add a cache, like this:: def calc(i): time.sleep(0.5); return i**2 # takes 2.5s range(5) | repeatFrom(2) | apply(calc, cache=10) | deref() # takes 5s range(5) | repeatFrom(2) | apply(calc) | deref() You can add custom keyword arguments into the function:: def f(x, y, z=3): return x + y + z # returns [15, 17, 19, 21, 23] [range(5), range(10, 15)] | transpose() | ~apply(f, z=5) | deref() Slight reminder that you can't pass in extra positional args like in :class:`aS`, just extra keyword arguments. See also: :class:`aS`, :class:`~k1lib.cli.filt.filt` .. admonition:: JS transpiler notes So, because JS don't have the concept of keyword arguments, ``kwargs`` will have its values extracted, then injected as positional arguments in the transpiled JS function. :param column: if not None, then applies the function to that column or columns only :param cache: if specified, then caches this much number of values :param kwargs: extra keyword arguments to pass in the function""" # apply super().__init__(fs=[f]); self.f = f; self.kwargs = kwargs # f is the original operator, _fC is # apply if column: # quick type checks # apply ex = Exception(f"Applying a function on a negative-indexed column ({column}) is not supported") # apply if isinstance(column, int): # apply if column < 0: raise ex # apply else: # apply column = list(column) # apply if len([c for c in column if c < 0]): raise ex # apply self.column = column; self.cache = cache; self._fC = fastF(f) # apply if cache > 0: self._fC = lru_cache(cache)(self._fC) # apply self.normal = self.column is None and self.cache == 0 and len(kwargs) == 0 # cached value to say that this apply is just being used as a wrapper, nothing out of the ordinary, like custom columns, cache or custom kwargs # apply if self.normal: # just propagating information upward, to save runtime graph analysis time # apply try: self._propagatedF = f._propagatedF; self._applyDepth = f._applyDepth + 1 # assuming f is another apply() # apply except: self._propagatedF = f; self._applyDepth = 1 # apply else: self._propagatedF = None; self._applyDepth = 1 # might have to rethink if this depth should be 1 or not # apply # optimization 1: BaseCli._all_array_opt(), aimed at accelerating array types # apply self.__arrayTypeF = None # None for not formulated yet, 0 for cannot formulate a faster operation, else the cached, accelerated function (that might not work) # apply # optimization 2: BaseCli._all_opt(), aimed at accelerating language models # apply self.__allOptF = None # None for not formulated yet, 0 for cannot formulate a faster operation, else the cached, accelerated function (that will guaranteed to work) # apply self.inverted = False; self.preInvApply = None # for ._jsF(), to contain information about the apply() pre __invert__(), so that .jsF() can extract out information # apply
@property # apply def _arrayTypeF(self): # optimization 1: returns None or the function (that might not work) # apply if self.__arrayTypeF == 0: return None # apply if self.__arrayTypeF is None: # apply arrs = []; last = self # figure out the depth # apply while isinstance(last, apply) and last.normal: arrs.append(last); last = last.f # apply depth = len(arrs) # apply if depth == 0: self.__arrayTypeF = 0; return None # apply if isinstance(last, cli.serial): # breaks up the serial: (A | B.all(2)).all(3) -> A.all(3) | B.all(5) # apply self.__arrayTypeF = cli.serial(*[(e if isinstance(e, BaseCli) else aS(e)).all(depth) for e in last.clis]); return self.__arrayTypeF # apply else: # it | A.all(3) -> A._all_array_opt(it, 3). This function might return NotImplemented, which means it can't figure out how to utilize the speed up # apply if not hasattr(last, "_all_array_opt"): last = aS(last) # apply self.__arrayTypeF = lambda it: last._all_array_opt(it, depth); return self.__arrayTypeF # apply return self.__arrayTypeF # apply @property # apply def _allOptF(self): # optimization 2: returns None or the function (that has to work all the time!) # apply if self.__allOptF == 0: return None # apply if self._propagatedF is None or not hasattr(self._propagatedF, "_all_opt"): self.__allOptF = 0; return None # apply f = self._propagatedF._all_opt # apply def inner(it): # has to regenerate the IR on each pass through. Slow (O(30*n) or so), but the function this is supposed to run (LLMs), are even slower, so this is fine for now # apply af, ir, n = _allOpt_genIr(it, self._applyDepth) # af = a flat # apply af = af | iterDelay() # apply return _allOpt_recover(iter(f(af)), ir, infIter(), n) # apply return inner # apply def _typehint(self, inp): # apply if self.column is None: # apply if isinstance(inp, tListIterSet): # apply try: return tIter(self.f._typehint(inp.child)) # apply except: return tIter(tAny()) # apply return super()._typehint(inp) # apply def _copy(self): return apply(self.f, self.column, self.cache, **self.kwargs) # ~apply() case handled automatically # apply
[docs] def __ror__(self, it:Iterator[str]): # apply c = self.column; f = self._fC; ogF = self.f; farr = (getattr(ogF, "_all_opt2", None) or f); kwargs = self.kwargs # apply if c is None: # apply if self.normal: # apply if isinstance(it, settings.arrayTypes): # optimization 1 # apply af = self._arrayTypeF # apply if af is not None: # there're lots of code here, but it doesn't impact perf cause it's done once for each array object # apply try: # apply ans = af(it) # apply if ans is not NotImplemented: return ans # apply except init.ArrayOptException as e: raise e # this is a special exception. If the function throws this then the error relates to incorrect constraints, and not accidental, so shouldn't be ignored # apply except: pass # apply self.__arrayTypeF = 0 # tried to use the accelerated version, but failed, so won't ever try the accelerated version again # apply elif self._allOptF is not None: return self._allOptF(it) # optimization 2, for LLMs # apply elif hasPandas and isinstance(it, pd.core.arraylike.OpsMixin): # apply it = init.dfGuard(it) # apply try: # apply res = farr(it) # apply if res is not NotImplemented: return res # apply except: return np.array([f(x) for x in it]) # apply return (f(line, **kwargs) for line in init.dfGuard(it)) # apply elif isinstance(c, int): # apply if hasPandas and isinstance(it, pd.DataFrame): # apply it = it.copy(); colN = list(it)[c] # apply try: it[colN] = f(it[colN]) # apply except: it[colN] = [f(x) for x in it[colN]] # apply return it # apply def gen(it): # apply for row in it: row = list(row); row[c] = f(row[c], **kwargs); yield row # apply return gen(it) # return ([(e if i != c else f(e, **kwargs)) for i, e in enumerate(row)] for row in it) # old version # apply else: # List[int] # apply if hasPandas and isinstance(it, pd.DataFrame): # apply it = it.copy() # apply for c_ in c: # apply colN = list(it)[c_] # apply try: it[colN] = f(it[colN]) # apply except: it[colN] = [f(x) for x in it[colN]] # apply return it # apply def gen(it): # apply for row in it: # apply row = list(row) # apply for c_ in c: row[c_] = f(row[c_], **kwargs) # apply yield row # apply return gen(it) # apply
[docs] def __invert__(self): # apply """Same mechanism as in :class:`applyS`, it expands the arguments out. Just for convenience really. Example:: # returns [10, 12, 14, 16, 18] [range(5), range(10, 15)] | transpose() | ~apply(lambda x, y: x+y) | deref()""" # apply if self.inverted: raise Exception("Doesn't support _invert__()ing multiple times") # apply res = apply(lambda x: self._fC(*x, **self.kwargs), self.column, self.cache) # apply res.preInvApply = self; res.inverted = True; return res # apply
def _jsF(self, meta): # apply if self.cache != 0: raise Exception("apply._jsF() doesn't support caching values yet") # apply fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); kwIdx = init._jsDAuto(); argIdx = init._jsDAuto(); inverted = False # apply if self.inverted: self = self.preInvApply; inverted = True # apply header, _fIdx, _async = k1lib.kast.asyncGuard(k1lib.kast.prepareFunc3(self.f, ("apply", meta), self.kwargs)) # apply return f"{header}\n{kwIdx} = {json.dumps(self.kwargs)};\n{fIdx} = {'async ' if _async else ''}({dataIdx}) => {dataIdx}.apply{'_async' if _async else ''}({'async ' if _async else ''}({argIdx}) => {'await ' if _async else ''}{_fIdx}({'...' if inverted else ''}{argIdx}), {cli.kjs.v(self.column)}, {kwIdx}, false)", fIdx # apply # apply # old code below, with args (self, kast, jsFnVars:"list[str]", **kwargs) # apply argVars = kast.kast_lambda(self.f); var = ",".join(argVars) # apply fn, header = kast.kast_prepareFunc(self.f, [*argVars, *jsFnVars]) # apply col = self.column # apply if col is None: return f".apply(({var}) => {fn})", header # apply else: # apply cols = [col] if isinstance(col, int) else col # apply return "".join([f".apply(({var}) => {fn}, {c})" for c in cols]), header # apply
cloudpickle = k1lib.dep("cloudpickle", url="https://github.com/cloudpipe/cloudpickle") # apply def executeFunc(common, line, usingDill): # executeFunc import time # executeFunc if usingDill: # executeFunc import dill; f, kwargs = dill.loads(common) # executeFunc res = f(dill.loads(line), **kwargs) # executeFunc else: # executeFunc import cloudpickle; f, kwargs = cloudpickle.loads(common) # executeFunc res = f(cloudpickle.loads(line), **kwargs) # executeFunc time.sleep(0.1); return res # suggestion by https://stackoverflow.com/questions/36359528/broken-pipe-error-with-multiprocessing-queue # executeFunc def terminateGraceful(): signal.signal(signal.SIGINT, signal.SIG_IGN) # terminateGraceful _k1_applyMp_global_ctx = {}; _k1_applyMp_global_ctx_autoInc = k1lib.AutoIncrement(prefix="_k1_applyMp") # terminateGraceful
[docs]class applyMp(BaseCli): # applyMp blurb="Applies a function to all input elements across multiple processes" # applyMp _pools = set() # applyMp _torchNumThreads = None # applyMp
[docs] def __init__(self, f:Callable[[Any], Any], prefetch:int=None, timeout:float=8, utilization:float=0.8, bs:int=1, newPoolEvery:int=0, **kwargs): # applyMp """Like :class:`apply`, but execute a function over the input iterator in multiple processes. Example:: # returns [3, 2] ["abc", "de"] | applyMp(len) | deref() # returns [5, 6, 9] range(3) | applyMp(lambda x, bias: x**2+bias, bias=5) | deref() # returns [[1, 2, 3], [1, 2, 3]], demonstrating outside vars work someList = [1, 2, 3] ["abc", "de"] | applyMp(lambda s: someList) | deref() Internally, this will continuously spawn new jobs up until 80% of all CPU cores are utilized. On posix systems, the default multiprocessing start method is ``fork()``. This sort of means that all the variables in memory will be copied over. On windows and macos, the default start method is ``spawn``, meaning each child process is a completely new interpreter, so you have to pass in all required variables and reimport every dependencies. Read more at https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods If you don't wish to schedule all jobs at once, you can specify a ``prefetch`` amount, and it will only schedule that much jobs ahead of time. Example:: range(10000) | applyMp(lambda x: x**2) | head() | deref() # 700ms range(10000) | applyMp(lambda x: x**2, 5) | head() | deref() # 300ms # demonstrating there're no huge penalties even if we want all results at the same time range(10000) | applyMp(lambda x: x**2) | deref() # 900ms range(10000) | applyMp(lambda x: x**2, 5) | deref() # 1000ms The first line will schedule all jobs at once, and thus will require more RAM and compute power, even though we discard most of the results anyway (the :class:`~k1lib.cli.filt.head` cli). The second line only schedules 5 jobs ahead of time, and thus will be extremely more efficient if you don't need all results right away. .. note:: Remember that every :class:`~k1lib.cli.init.BaseCli` is also a function, meaning that you can do stuff like:: # returns [['ab', 'ac']] [["ab", "cd", "ac"]] | applyMp(filt(op().startswith("a")) | deref()) | deref() Also remember that the return result of ``f`` should be serializable, meaning it should not be a generator. That's why in the example above, there's a ``deref()`` inside f. You should also convert PyTorch tensors into Numpy arrays Most of the time, you would probably want to specify ``bs`` to something bigger than 1 (may be 32 or sth like that). This will executes ``f`` multiple times in a single job, instead of executing ``f`` only once per job. Should reduce overhead of process creation dramatically. If you encounter strange errors not seen on :class:`apply`, you can try to clear all pools (using :meth:`clearPools`), to terminate all child processes and thus free resources. On earlier versions, you have to do this manually before exiting, but now :class:`applyMp` is much more robust. Also, you should not immediately assume that :class:`applyMp` will always be faster than :class:`apply`. Remember that :class:`applyMp` will create new processes, serialize and transfer data to them, execute it, then transfer data back. If your code transfers a lot of data back and forth (compared to the amount of computation done), or the child processes don't have a lot of stuff to do before returning, it may very well be a lot slower than :class:`apply`. There's a potential loophole here that can make your code faster. Because the main process is forked (at least on linux), every variable is still there, even the big ones. So, you can potentially do something like this:: bigData = [] # 1B items in the list # summing up all items together. No input data transfers (because it's forked instead) range(1_000_000_000) | batched(100) | applyMp(lambda r: r | apply(lambda i: bigData[i]) | toSum()) | toSum() In fact, I use this loophole all the time, and thus has made the function :meth:`shared`, so check it out. :param prefetch: if not specified, schedules all jobs at the same time. If specified, schedules jobs so that there'll only be a specified amount of jobs, and will only schedule more if results are actually being used. :param timeout: seconds to wait for job before raising an error :param utilization: how many percent cores are we running? 0 for no cores, 1 for all the cores. Defaulted to 0.8 :param bs: if specified, groups ``bs`` number of transforms into 1 job to be more efficient. :param kwargs: extra arguments to be passed to the function. ``args`` not included as there're a couple of options you can pass for this cli. :param newPoolEvery: creates a new processing pool for every specific amount of input fed. 0 for not refreshing any pools at all. Turn this on in case your process consumes lots of memory and you have to kill them eventually to free up some memory""" # applyMp super().__init__(fs=[f]); self.f = fastF(f) # applyMp self.prefetch = prefetch or int(1e9) # applyMp self.timeout = timeout; self.utilization = utilization # applyMp self.bs = bs; self.kwargs = kwargs; self.p = None # applyMp self.newPoolEvery = newPoolEvery; self.ps = []; self._serializeF = True # applyMp
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # applyMp timeout = self.timeout; f = self.f # really make sure it's an iterator, for prefetch # applyMp if self.bs > 1: return it | cli.batched(self.bs, True) | applyMp(apply(f) | cli.toList(), self.prefetch, timeout, **self.kwargs) | cli.joinStreams() # applyMp def newPool(): # applyMp if hasTorch: # applyMp try: applyMp._torchNumThreads = applyMp._torchNumThreads or torch.get_num_threads(); torch.set_num_threads(1) # applyMp except: pass # why do all of this? Because some strange interaction between PyTorch and multiprocessing, outlined here: https://github.com/pytorch/pytorch/issues/82843 # applyMp os.environ["py_k1lib_in_applyMp"] = "True" # applyMp self.p = mp.Pool(int(mp.cpu_count()*self.utilization), terminateGraceful); self.ps.append(self.p) # applyMp if hasTorch and applyMp._torchNumThreads is not None: torch.set_num_threads(applyMp._torchNumThreads) # applyMp def intercept(it, n): # applyMp for i, e in enumerate(it): # applyMp if i % n == 0: # applyMp if self.p is not None: self.p.close(); self.ps.remove(self.p) # applyMp gc.collect(); newPool() # applyMp yield e # applyMp try: common = dill.dumps([f, self.kwargs]); usingDill = True # applyMp except: common = cloudpickle.dumps([f, self.kwargs]); usingDill = False # applyMp def gen(it): # applyMp with k1lib.captureStdout(False, True) as out: # applyMp try: # applyMp if self.newPoolEvery > 0: it = intercept(it, self.newPoolEvery) # applyMp else: newPool() # applyMp yield from it | apply(lambda line: self.p.apply_async(executeFunc, [common, dill.dumps(line), usingDill])) | iterDelay(self.prefetch) | apply(lambda x: x.get(timeout)) # applyMp except KeyboardInterrupt as e: # applyMp print("applyMp interrupted. Terminating pool now") # applyMp for p in self.ps: p.close(); p.terminate(); # applyMp raise e # applyMp except Exception as e: # applyMp print("applyMp encounter errors. Terminating pool now") # applyMp for p in self.ps: p.close(); p.terminate(); # applyMp raise e # applyMp else: # applyMp for p in self.ps: p.close(); p.terminate(); # applyMp return gen(it) # applyMp
[docs] @staticmethod # applyMp def cat(fileName: str, f:Callable, n:int=None, rS=None, **kwargs): # applyMp """Like :meth:`applyCl.cat`, this will split a file up into multiple sections, execute ``f`` over all sections and return the results. Example:: fn = "~/repos/labs/k1lib/k1lib/cli/test/applyMp.cat" "0123456789\\n"*100 | file(fn) # returns [6, 6, 6, 7, 6, 6, 6, 7, 6, 6, 6, 7, 6, 6, 6, 8] applyMp.cat(fn, shape(0), 16) | deref() :param f: function to execute on an iterator of lines :param n: how many chunks should it split the file into. Defaulted to the number of cpu cores available :param rS: :class:`~k1lib.cli.inp.refineSeek` instance, if you need more fine-grained control over section boundaries so as to not make everything corrupted :param kwargs: extra keyword arguments for :class:`applyMp`""" # applyMp return fileName | cli.splitSeek(n or os.cpu_count()) | (rS or cli.iden()) | cli.window(2) | ~applyMp(lambda x,y: cli.cat(fileName, sB=x, eB=y) | f, **kwargs) # applyMp
[docs] @staticmethod # applyMp def shared(f, **kwargs): # applyMp """Execution model where the input iterator is dereferenced and shared across all processes, bypassing serialization. Example:: a = range(1_000_000_000) | apply(lambda x: x*1.5 - 2000) | aS(list) # giant data structure a | batched(50_000_000, True) | applyMp(toSum()) | toSum() # has to serialize and deserialize lists of numbers, which wastes lots of cpu cycles and memory a | applyMp.shared(toSum()) | toSum() # giant data structure is forked, no serialization happens, no memory even gets copied, much faster In the 2nd line, most of the time is spent on serializing the data and transferring it to other processes, while in the 3rd line, most of the time is spent on calculating the sum instead, as the giant data structure is forked, and Linux doesn't copy it internally.""" # applyMp def inner(it): # applyMp try: n = len(it) # applyMp except: it = list(it); n = len(it) # applyMp # this is pretty unintuitive right? Why do it this way? Turns out, if you were to reference `it` directly, it will store it in f's co_freevars, # applyMp # which will be serialized, defeating the purpose. Moving it to a global variable forces it to move to co_names instead, avoiding serialization. This took forever to understand # applyMp idx = _k1_applyMp_global_ctx_autoInc(); _k1_applyMp_global_ctx[idx] = it # applyMp res = range(n) | cli.batched(round(n/os.cpu_count()+1), True) | applyMp(lambda r: f(_k1_applyMp_global_ctx[idx][r.start:r.stop]), **kwargs) | aS(list) # applyMp _k1_applyMp_global_ctx[idx] = None; return res # applyMp return aS(inner) # applyMp
def _copy(self): return applyMp(self.f, self.prefetch, self.timeout, self.utilization, self.bs, self.newPoolEvery, **self.kwargs) # applyMp
[docs] def __invert__(self): # applyMp """Expands the arguments out, just like :class:`apply`. Example:: # returns [20, 20, 18, 14, 8, 0, -10, -22, -36, -52] [range(10), range(20, 30)] | transpose() | ~applyMp(lambda x, y: y-x**2) | deref()""" # applyMp res = self._copy(); f = res.f; res.f = lambda x: f(*x); return res # applyMp
[docs] @staticmethod # applyMp def clearPools(): # applyMp """Terminate all existing pools. Do this before restarting/quitting the script/notebook to make sure all resources (like GPU) are freed. **Update**: you probably won't have to call this manually anymore since version 0.9, but if you run into problems, try doing this.""" # applyMp for p in applyMp._pools: # applyMp try: p.terminate() # applyMp except: pass # applyMp applyMp._pools = set() # applyMp
[docs] @staticmethod # applyMp def pools(): # applyMp """Get set of all pools. Meant for debugging purposes only.""" # applyMp return applyMp._pools # applyMp
def __del__(self): # applyMp return # applyMp if hasattr(self, "p"): # applyMp self.p.terminate(); # applyMp if self.p in applyMp._pools: applyMp._pools.remove(self.p) # applyMp
# apparently, this doesn't do anything, at least in jupyter environment # applyMp atexit.register(lambda: applyMp.clearPools()) # applyMp parallel = applyMp # applyMp s = k1lib.Settings(); settings.add("applyCl", s, "modifier.applyCl() settings") # applyMp s.add("sudoTimeout", 300, "seconds before deleting the stored password for sudo commands") # applyMp s.add("cpuLimit", None, "if specified (int), will not schedule more jobs if the current number of assigned cpus exceeds this") # applyMp _password = k1lib.Wrapper(None); _cpuUsed = k1lib.Wrapper(0) # applyMp def removePw(): # removePw while True: time.sleep(settings.applyCl.sudoTimeout); _password.value = None # removePw t = threading.Thread(target=removePw, daemon=True).start() # removePw _nodeIdsCache = k1lib.Wrapper([]) # removePw def specificNode(f, nodeId:str, num_gpus=0): # modify a function so that it will only run on a specific node only # specificNode if num_gpus > 0: # specificNode #return f.options(num_gpus=num_gpus, scheduling_strategy="SPREAD") # specificNode return f.options(num_gpus=num_gpus, scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(node_id=nodeId, soft=False)) # specificNode else: return f.options(scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(node_id=nodeId, soft=False)) # specificNode def exportSe(se): # exportSe if not isinstance(se, k1lib.Settings): return se # exportSe return {k:exportSe(v) for k,v in se.__dict__.items() if not k.startswith("_") and k != "kjs"} # excluding dicts of kjs transpilers cause that causes problems for applyCl() as it's using cloudpickle # exportSe def movePropsSe(obj, se): # movePropsSe d = se.__dict__; keys = [e for e in d.keys() if not e.startswith("_")] # movePropsSe for key in keys: # movePropsSe if key not in obj or key == "kjs": continue # movePropsSe if isinstance(d[key], k1lib.Settings): movePropsSe(obj[key], d[key]) # movePropsSe else: d[key] = obj[key] # movePropsSe _applyCl_soCache = set() # dynamic library (.so) that has been installed across all nodes, so don't have to reimport # movePropsSe
[docs]class applyCl(BaseCli): # applyCl blurb="Applies a function to all input elements across a Ray cluster" # applyCl
[docs] def __init__(self, f, prefetch=None, timeout=60, bs=1, rss:Union[dict, str]={}, pre:bool=False, num_cpus=1, num_gpus=0, memory=None, resolve=True, **kwargs): # applyCl """Like :class:`apply`, but execute a function over the input iterator in multiple processes on multiple nodes inside of a cluster (hence "cl"). So, just a more powerful version of :class:`applyMp`, assuming you have a cluster to run it on. Example:: # returns [3, 2] ["abc", "de"] | applyCl(len) | deref() # returns [5, 6, 9] range(3) | applyCl(lambda x, bias: x**2+bias, bias=5) | deref() # returns [[1, 2, 3], [1, 2, 3]], demonstrating outside vars work someList = [1, 2, 3] ["abc", "de"] | applyCl(lambda s: someList) | deref() nIds = applyCl.nodeIds() # returns [[<nodeId1>, 0], [<nodeId2>, 1], ...], demonstrating preserve mode [nIds, range(10)] | transpose() | applyCl(lambda x: x**2, pre=True) | deref() # executes the function, but stores the result on remote nodes, instead of copying result to this node a = range(5) | applyCl(lambda x: x**2, resolve=False) | deref() # returns [0, 1, 4, 9, 16] a | applyCl(lambda x: x) | deref() Summary of all mode of operations:: # Data types: # - 1: literal value, just a normal Python object # - or1: ray.ObjectRef object - Ray's reference to a remote object living somewhere # - h1: Handle object - k1lib's reference to a remote object, obtained if `resolve` is set to False. Use `h.get()` to # - n1: node id, string [1/or1/h1, 2/or2/h2] | applyCl(...) # returns [1, 2, 3]. "1/or1/h1" means that the input can be a list of literals, ObjectRef, or Handle [1/or1/h1, 2/or2/h2] | applyCl(..., resolve=False) # returns [h1, h2, h3] [[n1/h1, 1/or1/h3], [n2/h2, 2/or2/h4]] | applyCl(..., pre=True) # returns [[n1/h1, 1], [n2/h2, 2]], executed on n1/h1, h3 is copied over [[n1/h1, 1/or1/h3], [n2/h2, 2/or2/h4]] | applyCl(..., pre=True, resolve=False) # returns [[n1/h1, h3], [n2/h2, h4]] [n1, n2] | applyCl.aS(lambda: ...) # returns [[n1, 1], [n2, 2]] None | applyCl.aS(lambda: ...) # returns [[n1, 1], [n2, 2], ...], executes once on all nodes [n1, n2] | applyCl.aS(lambda: ..., resolve=False) # returns [[n1, h1], [n2, h2]] Internally, this uses the library Ray (https://www.ray.io) to do the heavy lifting. So, :class:`applyCl` can be thought of as a thin wrapper around that library, but still has the same consistent interface as :class:`apply` and :class:`applyMp`. From all of my tests so far, it seems that :class:`applyCl` works quite well and is quite robust, so if you have access to a cluster, use it over :class:`applyMp`. The library will connect to a Ray cluster automatically when you import everything using ``from k1lib.imports import *``. It will execute ``import ray; ray.init()``, which is quite simple. If you have ray installed, but does not want this default behavior, you can do this:: import k1lib k1lib.settings.startup.init_ray = False from k1lib.imports import * As with :class:`applyMp`, there are pitfalls and weird quirks to multiprocessing, on 1 or multiple nodes, so check out the docs over there to be aware of them, as those translates well to here. There're more extensive documentation on these notebooks: `27-multi-node <https://mlexps.com/other/27-multi-node/>`_, `30-applyCl-benchmarks <https://mlexps.com/other/30-applyCl-benchmarks/>`_, if you want to kinda get the feel of this tool more. .. admonition:: Time sharing the cluster Let's say that the cluster is located in a company, and that multiple users want to access it, then you might have to think about it a little more. Say the cluster has 60 cores, and someone has launched a long-running job: 2160 tasks, 10 minutes/task, 1 core/task, totalling 6 hours. If you want to launch another job that has 20 tasks, requiring 10 cores, 1 second/task, totalling 2 seconds on an idle cluster. All modern schedulers (Ray, Slurm, Spark, etc) can't schedule your 20 tasks immediately. It has to wait for some running tasks to finish to schedule your task. This means you have to wait on average for 5-10 minutes before all of your tasks finish. This might be fine if you've used Slurm a lot, but extremely not okay for me and my patience. The whole point of a cluster is to get results immediately, within a few seconds. So here's a workaround:: # long running task, on notebook 1 from k1lib.imports import * settings.cli.applyCl.cpuLimit = 40 range(2160) | applyCl(lambda x: time.sleep(10*60)) | ignore() # long running task # short running task, on notebook 2 from k1lib.imports import * range(20) | applyCl(lambda x: time.sleep(1)) | ignore() # short running task, should finishes almost immediately Essentially, there's that setting that you can adjust. Like with Ray's ``num_cpus``, this is merely a suggestion to my library to not schedule jobs past that cpu limit, but you can circumvent it in some strange edge cases that I'm too lazy to implement. Likewise, when you schedule a Ray task, you can specify that it will only take 1 cpu, but you can end up forking it into 5 different processes, which can cause congestion and memory thrashing. If Ray doesn't do it right (possibly impossible to do so anyway) then do I really have to? .. admonition:: Advanced use case Not really advanced, but just a bit difficult to understand/follow. Let's say that you want to scan through the home directory of all nodes, grab all files, read them, and get the number of bytes they have. You can do something like this:: a = None | applyCl.aS(lambda: None | cmd("ls ~") | filt(os.path.isfile) | deref()) | deref() b = a | ungroup() | deref() c = b | applyCl(cat(text=False) | shape(0), pre=True) | deref() d = c | groupBy(0, True) | apply(item().all() | toSum(), 1) | deref() Noted, this is relatively complex. Let's see what A, B, C and D looks like:: # A [['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', ['Miniconda3-latest-Linux-x86_64.sh', 'mintupgrade-2023-04-01T232950.log']], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', ['5a', 'abc.jpg', 'a.txt']]] # B [['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 'Miniconda3-latest-Linux-x86_64.sh'], ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 'mintupgrade-2023-04-01T232950.log'], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', '5a'], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 'abc.jpg'], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 'a.txt']] # C [['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 74403966], ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 1065252], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 2601], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 16341], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 10177]] # D [['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 92185432], ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 75469218]] The steps we're concerned with is A and C. In step A, we're running 2 processes, 1 for each node, to get all the file names in the home directory. In step C, we're running 5 processes total, 2 on the first node and 3 on the second node. For each process, it's going to read as bytes and count up those bytes. Finally in step D, the results are grouped together and the sizes summed. So yeah, it's pretty nice that we did all of that in a relatively short amount of code. The data is distributed too (reading multiple files from multiple nodes), so we're truly not bottlenecked by anything. .. admonition:: Context object handle Let's say you have these unresolved handles:: # creates a bunch of infinite random number generators, one on each node its = None | applyCl.aS(lambda: repeatF(lambda: random.randint(2, 15)), resolve=False) | deref() # gets the next value of all generators, can return [4, 4, 9, 2] for a 4-node cluster its | cut(1) | applyCl(lambda x: next(x)) | deref() # gets the next value of all generators, add 0 to the 1st generator, 1 to the 2nd generator, etc, then return the resulting output that might look like [3, 16, 10, 7] [its | cut(1), range(4)] | transpose() | applyCl(lambda x: next(ctxHandle) + x, pre=True) | cut(1) | deref() So, the special thing about this is that variable `ctxHandle` on the last line. That is a special variable that is injected on the way. Why all this complexity? The whole idea with unresolved object handles is that you have a distributed complex data structure that can't be serialized and juggle around easily. That's the `its` handles in the example. Then, you might want to feed in some (simple, serializable) input X, change the complex data structure in its own process, then return some (simple, serializable) output Y. In the example, X is range(4), while Y is the resulting number array. .. admonition:: Cython Even with running everything distributedly like this, you might run into speed issues. Then, you'll essentially have 2 options. First is to write (pleasant) Cython code, or second is to write (unpleasant) C/C++ Python extensions. If you were to choose the C/C++ option, then here's the flow: - Develop Python C extension, export everything as a shared library (a single .so file) - Execute ``applyCl.installSo("library.so")`` to install the library to all nodes - Use functions provided by your library normally, like ``import yourlibrary; range(10) | applyCl(yourlibrary.afunction) | deref()`` But applyCl can deal with cython functions directly in your notebook. Here's the flow: - Annotate a code cell with the magic "%%cython", write Cython code as usual - Just use that function normally Let's see an example:: # ---------- code cell 1 ---------- from k1lib.imports import * # cython ipython extension is automatically loaded # ---------- code cell 2 ---------- %%cython from k1lib.cli import ls # demonstrating that you can use all of the existing tools and libraries as usual cdef g(a:int): return f"{a} 123" # demonstrating that you can refactor out to other functions def f (a:int): return [g(a), ls(".")] # ---------- code cell 3 ---------- range(10) | applyCl(f) | deref() You only have to install Cython on the current node and not the other nodes. Also note that currently, this only supports you passing in Cython-compiled functions directly into ``applyCl()`` or ``applyCl.aS()``. You can't pass a normal Python function that uses a Cython function like this:: # ---------- code cell 1 ---------- from k1lib.imports import * # ---------- code cell 2 ---------- %%cython from k1lib.cli import ls # note: have to reimport here because all the symbols inside this code block is independent from the rest of the notebook cpdef g(a:int): return f"{a} 123" # ---------- code cell 3 ---------- def f (a:int): return [g(a), ls(".")] range(10) | applyCl(f) | deref() # this throws an import error, as the compiled code won't be installed on the remote nodes This behavior can potentially be fixed in the future, but I'm lazy and it's not a hard thing to follow the rules. The dynamic library will be installed in the working directory. You can delete them after a coding session to free up some space, but they're likely to be tiny, so you don't really have to worry about it. Also, like everything else in parallel programming, please benchmark absolutely everything because it might even be slower using Cython if internally you're allocating space for large data structures constantly, compared to cli tool's lazy execution model. For operations that work on giant files, I actually find it very difficult to gain any appreciable speedups using Cython, as cli tools are already pretty optimized, so best task for this is probably long-running, complex mathematical modelling, and not generic text manipulation. .. warning:: Just like with any other parallel processing model, there are some quirks that can happen behind the scenes that aren't quite what you expected, as this is incredibly tricky. Dig into Ray's serialization page (https://docs.ray.io/en/latest/ray-core/objects/serialization.html) or their whitepapers (https://docs.ray.io/en/latest/ray-contribute/whitepaper.html) to get a feel for how it all works underneath. The notable quirks that you might need to think about is: - A lot of the internal code assumes that you're on a Unix system, preferably Linux, so it might not work on other platforms like Windows. But honestly, screw Windows. :param prefetch: if not specified, schedules all jobs at the same time. If specified, schedules jobs so that there'll only be a specified amount of jobs, and will only schedule more if results are actually being used. :param timeout: seconds to wait for job before raising an error :param bs: if specified, groups ``bs`` number of transforms into 1 job to be more efficient. :param rss: resources required for the task. Can be {"custom_resource1": 2} or "custom_resource1" as a shortcut :param pre: "preserve", same convention as :meth:`applyCl.aS`. If True, then allow passing through node ids as the first column to shedule jobs on those specific nodes only :param num_cpus: how many cpu does each task take? :param memory: how much memory to give to the task in bytes? :param resolve: whether to resolve the outputs or not. Set this to False to not move memory to the requesting node and cache the big data structure on the remote node :param kwargs: extra arguments to be passed to the function. ``args`` not included as there're a couple of options you can pass for this cli.""" # applyCl super().__init__(fs=[f]); _fC = fastF(f); self.ogF = f; self.pre = pre; self.num_cpus = num_cpus # applyCl try: # f might be a string, so can't do f.__module__ # applyCl isCythonFunc = "cython" in f.__module__ # applyCl if isCythonFunc: applyCl.installSo(sys.modules[f.__module__].__file__) # applyCl except: isCythonFunc = False # applyCl self.rss = rss = {rss: 1} if isinstance(rss, str) else rss # applyCl cwd = os.getcwd(); se = exportSe(k1lib.settings) # applyCl def remoteF(s, e, idxCtxHandle=None): # function that will be executed on remote node. Have to setup environment a little bit before executing # applyCl # e: the real element. s is just e's storage context, in case e is a Handle. Else s is not used and can be None. Why? Because Actors can't be # applyCl # serialized directly with cloudpickle, but it can be passed as function parameters, which Ray will do some special sauce serialization # applyCl import k1lib; movePropsSe(se, k1lib.settings) # do this to sync current settings with the remote worker nodes # applyCl if k1lib.settings.startup.or_patch.numpy: k1lib.cli.init.patchNumpy() # applyCl if k1lib.settings.startup.or_patch.dict: k1lib.cli.init.patchDict() # applyCl import os; os.makedirs(cwd, exist_ok = True); os.chdir(cwd) # applyCl if isinstance(e, Handle): e.setStorage(s); e = e.get() # applyCl if idxCtxHandle: _fC.__globals__["ctxHandle"] = s.d[idxCtxHandle] # applyCl return _fC(e, **kwargs) # applyCl # self.remoteF = remoteF; f = ray.remote(resources=rss, num_cpus=num_cpus, **({"memory": memory} if memory else {}))(remoteF) # applyCl rssKw = {"resources": rss, "num_cpus": num_cpus, "num_gpus": num_gpus, **({"memory": memory} if memory else {})}#; rssKw = None # applyCl self.prefetch = prefetch or int(1e8); self.timeout = timeout; self.bs = bs # applyCl self._copyCtx = lambda: [f, [prefetch, timeout, bs, rss, pre, num_cpus, num_gpus, memory, resolve], kwargs] # applyCl nodeId = applyCl.nodeId(); rssF = ray.remote(**rssKw)(remoteF) # f that has constraints injected into it # applyCl def preprocessF(e): # return Handle (if pre=False), or [nodeId, Handle] (if pre=True). f is remoteF, core element can be a Handle, real object, or ObjectRef # applyCl if resolve: # storage location managed by ray, returns or2/h2 or [nId/h1, or2/h2] # applyCl if pre: # applyCl a, b = e; s = extractStorage(b) # applyCl if isinstance(a, Handle): h = a.deposit(b); return [a, h.executeAsync(remoteF, rssKw, a.idx)], [a,h] # applyCl else: return [a, specificNode(rssF, a, num_gpus=num_gpus).remote(s, b)], [] # applyCl else: # applyCl if isinstance(e, Handle): return e.executeAsync(remoteF, rssKw), [e] # applyCl else: return rssF.remote(None, e), [] # applyCl else: # storage location explicitly managed by me, returns h2 or [nId/h1, h2] # applyCl storageWarmup() # applyCl if pre: # applyCl a, b = e; s = extractStorage(b) # applyCl if isinstance(a, Handle): # [h1, 2/or2/h2] # applyCl h = a.deposit(b) # first deposits b into a's storage context to get handle. h and a have the same storage context # applyCl a.report("report7"); b.report("report7") # applyCl return [a, h.executeAsync(remoteF, rssKw, a.idx)], [a,b,h] # then executes it in h's storage context # applyCl else: # [nId, 2/or2/h2] # applyCl if isinstance(b, Handle) and b.nodeId == nodeId: return [a, b.executeAsync(remoteF, rssKw)], [b] # [nId, h2], if h2 is on nId, then use h2's storage context # applyCl h = Handle.create(b, a, num_gpus=num_gpus); return [a, h.executeAsync(remoteF, rssKw)], [h] # create storage context on `a`, deposits b on it, then execute # applyCl else: # 1/or1/h1 # applyCl s = extractStorage(e) # applyCl if isinstance(e, Handle): return e.executeAsync(remoteF, rssKw), [e] # has storage context already, execute on it directly # applyCl else: h = Handle.create(e, num_gpus=num_gpus); return h.executeAsync(remoteF, rssKw), [h] # create storage context, deposit e on it, then execute # applyCl @ray.remote # applyCl def resolveFRemote(o): return 1 # applyCl def resolveF(e): # applyCl e = e[0] # why do this? Because of a pretty obscure bug related to the reference counting mechanism I have here # applyCl # in preprocessF(), it returns [meats, Python references to (potential) Handles] # applyCl # I have to do this in order to keep the Handles alive. After resolveF(), everything is settled, so # applyCl # old Handles can safely be deleted # applyCl if resolve: # applyCl if pre: a, b = e; return [a, b.block().get() if isinstance(b, Handle) else ray.get(b)] # applyCl else: return e.block().get() if isinstance(e, Handle) else ray.get(e) # applyCl else: # applyCl return [e[0], e[1].block()] if pre else e.block() # don't resolve to this node, but still block execution until that object is resolvable # applyCl self.preprocessF = preprocessF; self.resolveF = resolveF; applyCl.preprocessF = preprocessF; applyCl.resolveF = resolveF # references for lprun so that I can benchmark these 2 functions # applyCl
[docs] @staticmethod # applyCl def installSo(fn:str, force:bool=False): # applyCl """Installs dynamic library (.so file) to all nodes. :param fn: file name of the shared library :param force: force reinstall even if the library is already on the remote node""" # applyCl basename = os.path.basename(fn) # applyCl if not force and basename in _applyCl_soCache: return # applyCl print("Installing dynamic library to all nodes... ", end=""); _applyCl_soCache.add(basename); contents = cli.cat(fn, False) # applyCl None | applyCl.aS(lambda: contents | cli.file(basename)) | cli.ignore(); print("Done") # applyCl
[docs] def __ror__(self, it): # applyCl timeout = self.timeout; bs = self.bs; ogF = self.ogF; preprocessF = self.preprocessF; resolveF = self.resolveF # applyCl if bs > 1: return it | cli.batched(bs, True) | applyCl(lambda x: x | apply(ogF) | cli.aS(list), self.prefetch, timeout) | cli.joinStreams() # applyCl def gen(it): # applyCl futures = deque(); it = iter(it); n = self.num_cpus; limit = settings.applyCl.cpuLimit or int(1e9) # applyCl for i, e in zip(range(min(self.prefetch, (limit-_cpuUsed.value)//n)), it): # try to anticipate how much resources can be consumed ahead of time and only schedule that much, to prevent deadlocks when multiple applyCl() is called, but their parent process has not consumed the yield statement, so the cpu count doesn't get decremented # applyCl while _cpuUsed.value + n > limit: time.sleep(0.1) # this is a very rudimentary lock. Doesn't have to be accurate though, and Python's GIL ensure atomic-ness # applyCl futures.append(preprocessF(e)); _cpuUsed.value += n # applyCl for e in it: yield resolveF(futures.popleft()); futures.append(preprocessF(e)) # free and allocate cpu slot immediately, so no while loop necessary # applyCl for e in futures: res = resolveF(e); _cpuUsed.value -= n; yield res # applyCl applyCl.rorGen = gen # applyCl return gen(it) # applyCl
[docs] def __invert__(self): # applyCl """Expands the arguments out, just like :class:`apply`. Example:: # returns [20, 20, 18, 14, 8, 0, -10, -22, -36, -52] [range(10), range(20, 30)] | transpose() | ~applyCl(lambda x, y: y-x**2) | deref()""" # applyCl f, rest, kwargs = self._copyCtx(); return applyCl(lambda x: f(*x), *rest, **kwargs) # applyCl
[docs] @staticmethod # applyCl def nodeIds(includeSelf=True) -> List[str]: # applyCl """Returns a list of all node ids in the current cluster. Example:: applyCl.nodeIds() # returns something like ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', '1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068'] If you want to get nodes' metadata, then just use ray's builtin function ``ray.nodes()`` :param includeSelf: whether to include node id of the current process or not""" # applyCl res = ray.nodes() | cli.filt(lambda x: x["Alive"]) | apply(lambda x: x["NodeID"]) | aS(list) # applyCl if includeSelf: return res # applyCl res.remove(applyCl.nodeId()); return res # applyCl
[docs] @staticmethod # applyCl @lru_cache # applyCl def nodeId() -> str: # applyCl """Returns current node id""" # applyCl return ray.runtime_context.get_runtime_context().get_node_id() # applyCl
[docs] @staticmethod # applyCl def meta() -> object: # applyCl """Grabs the metadata object for the current node""" # applyCl return ray.nodes() | cli.filt(lambda x: x["NodeID"] == applyCl.nodeId()) | cli.item() # applyCl
[docs] @staticmethod # applyCl def cpu() -> int: # applyCl """Grabs the number of cpus available on this node""" # applyCl return int(applyCl.meta()["Resources"]["CPU"]) # applyCl
[docs] @staticmethod # applyCl def aS(f, **kwargs): # applyCl """Executes function f once for all node ids that are piped in. Example:: # returns [['1051da...', ['Desktop', 'Downloads']], ['7bb387...', ['Pictures', 'Music']]] applyCl.nodeIds() | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref() # same as above, demonstrating passing in a list of nodeIds ["1051da...", "7bb387..."] | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref() # same as above, demonstrating passing in "None" for all nodeIds in the cluster None | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref() If you want to execute f for all nodes, you can pass in None instead. As a reminder, this kinda follows the same logic as the popular cli :class:`aS`, where f is executed once, hence the name "apply Single". Here, the meaning of "single" is different. It just means execute once for each node ids. If you want to quickly execute a function on a single node, without all the fuss, there's this short form that you can follow:: # returns ['Desktop', 'Downloads'], demonstrating that you can also pass in a single node id "1051da..." | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref() :param f: main function to execute in each node. Not supposed to accept any arguments :param kwargs: keyword arguments for the main :class:`applyCl` function""" # applyCl f = fastF(f); final = apply(lambda x: [x, 0]) | applyCl(lambda _: f(), pre=True, **kwargs) # applyCl def inner(it): # applyCl shortform = False # applyCl if it is None: it = applyCl.nodeIds() # applyCl if isinstance(it, str): it = [it]; shortform = True # applyCl if it | ~cli.inSet(_nodeIdsCache()) | cli.shape(0) > 0: # caching nodeIds(), because that takes a surprising amount of time # applyCl _nodeIdsCache.value = applyCl.nodeIds(); outliers = it | ~cli.inSet(_nodeIdsCache()) | cli.deref() # applyCl if len(outliers) > 0: raise Exception(f"These nodes cannot be found: {outliers}") # applyCl return it | final | ((cli.cut(1) | cli.item()) if shortform else cli.iden()) # applyCl return aS(inner) # applyCl
[docs] @staticmethod # applyCl def cmd(s:str, sudo=False, nodeIds=None, **kwargs): # applyCl """Convenience function to execute shell command on all nodes. Example:: applyCl.cmd("mkdir -p /some/folder") It returns [[nodeid1, output1], [nodeid2, output2]]. If you need more flexibility, fall back to :meth:`applyCl.aS` :param s: shell command to execute :param sudo: if True, will execute the command with sudo privileges. Will ask for password and then cache it internally for 5 minutes :param kwargs: keyword arguments to pass to :class:`applyCl`""" # applyCl global _password; import getpass # applyCl if sudo: # applyCl if _password() is None: # applyCl print("Enter password:"); _password.value = getpass.getpass(prompt="") # applyCl return nodeIds | applyCl.aS(lambda: _password() | cli.cmd(f"sudo -S {s}") | cli.deref(), **kwargs) | cli.deref() # applyCl else: return nodeIds | applyCl.aS(lambda: None | cli.cmd(s) | cli.deref(), **kwargs) | cli.deref() # applyCl
[docs] @staticmethod # applyCl def lookup(): # applyCl """Tries to lookup a particular file to see on which node it's at. Example:: # returns [[nodeId, "something.txt"], [nodeId, "abc.jpg"]] ["something.txt", "abc.jpg"] | applyCl.lookup() # returns [nodeId, "something.txt"] "something.txt" | applyCl.lookup() Files that don't exist won't be included in the result, and files that exist on multiple nodes will be returned multiple times. The output format is such that I can pipe it into applyCl(..., pre=True) and have it execute some function that I want. This is pretty much just a convenience function.""" # applyCl def inner(fns): # applyCl fns = fns | cli.deref(); single = isinstance(fns, str) # applyCl if single: fns = [fns] # applyCl ans = None | applyCl.aS(lambda: fns | cli.iden() & (apply(os.path.expanduser) | apply(os.path.exists)) | cli.transpose() | cli.filt("x", 1) | cli.cut(0) | cli.deref()) | cli.ungroup() | cli.deref() # applyCl return ans[0] if single else ans # applyCl return cli.aS(inner) # applyCl
[docs] @staticmethod # applyCl def replicateFile(fn:str, nodeIds=None): # applyCl """Replicates a specific file in the current node to all the other nodes. Example:: applyCl.replicateFile("~/cron.log") Internally, this will read chunks of 100kB of the specified file and dump it incrementally to all other nodes, which has implications on performance. To increase or decrease it, check out :class:`~k1lib.cli.inp.cat`. This also means you can replicate arbitrarily large files around as long as you have the disk space for it, while ram size doesn't really matter Please note that this operation is not symmetric. Unlike :meth:`balanceFile` and :meth:`balanceFolder`, in which they can be invoke on any node and it'll roughly do the same thing (rebalances everything out), this operation can do totally different things depending on which node you run it on. Let's say the file exists on nodes A and B, but not on nodes C and D. If you run this function on either node A or B, it will replicate the file to C and D. However, if you run this function on node C or D, it will instead throw an error since the file doesn't exist. :param fn: file name""" # applyCl fn = os.path.expanduser(fn); dirname = os.path.dirname(fn) # applyCl # checking if there's an existing file already. If there is, then don't try to copy data to that node # applyCl if nodeIds is None: canSize = os.path.getsize(fn); nodeIds = None | applyCl.aS(lambda: os.path.getsize(fn) if os.path.exists(fn) else 0) | cli.filt(cli.op() != canSize, 1) | cli.cut(0) | cli.deref() # applyCl nodeIds = nodeIds | cli.wrapList().all() | cli.deref() # applyCl nodeIds | cli.insert(None, False).all() | applyCl(lambda _: None | cli.cmd(f"mkdir -p {dirname}; rm -rf {fn}") | cli.deref(), pre=True) | cli.deref() # applyCl for chunk in cli.cat(fn, text=False, chunks=True): nodeIds | cli.insert(chunk, False).all() | applyCl(lambda chunk: chunk >> cli.file(fn) | cli.deref(), pre=True) | cli.deref() # applyCl
[docs] @staticmethod # applyCl def balanceFile(fn:str, nAs:List[str]=None, nBs:List[str]=None, rS=None, chunkSize:int=100_000_000): # applyCl """Splits a specified file in node nAs and dumps other parts to nodes nBs. Example:: applyCl.balanceFile("~/cron.log") This will split the big files up into multiple segments (1 for each node). Then for each segment, it will read through it chunk by chunk into memory, and then deposits it into the respective nodes. Finally, it truncates the original files down to its segment boundary. The main goal of this is so that you can analyze a single big (say 200GB) file quickly. If that file is on a single node, then it will take forever, even with :class:`applyMp`. So splitting things up on multiple nodes will make analyzing it a lot faster. There's also the function :meth:`balanceFolder`, which has the opposite problem of having lots of small (say 100MB) files. So it will try to move files around (keeping them intact in the meantime) to different nodes so that the folder size ratio is roughly proportional to the cpu count. The exact split rule depends on the number of CPUs of each node. Best to see an example:: Command: applyCl.balanceFile("~/cron.log") Verbose command: applyCl.balanceFile("~/cron.log", ["1"], ["1", "2", "3", "4", "5"]) ----------- Before ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 52 0 0 0 0 ----------- After ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 8 12 16 8 8 This also works if you have files on existing nodes already, and are upgrading the cluster:: Command: applyCl.balanceFile("~/cron.log") Verbose command: applyCl.balanceFile("~/cron.log", ["1", "5"], ["1", "2", "3", "4", "5"]) ----------- Before ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 26 0 0 26 0 ----------- After ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 8 12 16 8 8 If you want to move files out of a node when decommissioning them, you can do something like this:: Command: applyCl.decommission("~/cron.log", ["3", "4"]) Verbose command: applyCl.balanceFile("~/cron.log", ["1", "2", "3", "4", "5"], ["1", "2", "5"]) ----------- Before ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 8 12 16 8 8 ----------- After ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 15 22 0 0 15 Remember that the node ids "1", etc. is for illustrative purposes only. You should get real node ids from :meth:`nodeIds`. Why is the file size proportional to the number of cores on each node? Well, if you have more cores, you should be able to process more, so as to make everything balanced, right? Again, this means that you can split arbitrarily large files as long as you have the disk space for it, ram size is not a concern. How does this perform? Not the best in the world if you don't have a lot of nodes. With sata 3 ssds, 750MB/s ethernet, I got transfer speeds of roughly 100MB/s. This should increase as you have more nodes based on the code structure, but I haven't tested it yet. Can it be faster? Definitely. Am I willing to spend time optimizing it? No. :param fn: file name :param nAs: node ids that currently stores the file. If not specified, try to detect what nodes the file exists in :param nBs: node ids that will store the file after balancing everything out. If not specified, will take all available nodes :param rS: :class:`~k1lib.cli.inp.refineSeek` instance, if you need more fine-grained control over section boundaries so as to not make everything corrupted :param chunkSize: see :meth:`balanceFolder` """ # applyCl from k1lib.cli._applyCl import balanceFile # applyCl with settings.cat.context(chunkSize=chunkSize): balanceFile(fn, nAs, nBs, rS) # applyCl
[docs] @staticmethod # applyCl def decommissionFile(fn, nAs:List[str], rS=None, chunkSize:int=100_000_000): # applyCl """Convenience function for :meth:`balanceFile`. See docs over there.""" # applyCl from k1lib.cli._applyCl import balanceFile # applyCl with settings.cat.context(chunkSize=chunkSize): balanceFile(fn, None, applyCl.nodeIds() | ~cli.inSet(nAs) | cli.deref(), rS) # applyCl
[docs] @staticmethod # applyCl def cat(fn:str=None, f:Callable=None, nodeIds=None, timeout:float=60, pre:bool=False, multiplier:int=1, includeId:bool=False, resolve:bool=True): # applyCl """Reads a file distributedly, does some operation on them, collects and returns all of the data together. Example:: fn = "~/repos/labs/k1lib/k1lib/cli/test/applyCl.cat.data" ("0123456789"*5 + "\\n") * 1000 | file(fn) applyCl.splitFile(fn) applyCl.cat(fn, shape(0), keepNodeIds=True) | deref() That returns something like this (for a 2-node cluster, with 2 (node A) and 4 (node B) cpus respectively):: [['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 167], ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 167], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 166], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 167], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 166], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 167]] Here, we're creating an initial file with 1000 lines. Then we'll split it up into 2 fragments: 334 lines and 667 lines and store them on the respective nodes. Then, on node A, we'll split the file up into 2 parts, each with 167 lines. On node B, we'll split the file up into 4 parts, each with around 166 lines. Then we'll schedule 6 processes total, each dealing with 166 lines. After all of that, results are collected together and returned. If you want to distinguish between different processes inside f, for example you want to write results into different files, you can do something like this:: dir_ = "~/repos/labs/k1lib/k1lib/cli/test" fn = f"{dir_}/applyCl.cat.data" applyCl.cmd(f"rm -r {dir_}/applyCl") # clear out old folders applyCl.cmd(f"mkdir -p {dir_}/applyCl") # creating folders # do processing on fn distributedly, then dump results into multiple files applyCl.cat(fn, ~aS(lambda idx, lines: lines | shape(0) | aS(dill.dumps) | file(f"{dir_}/applyCl/{idx}.pth")), includeId=True) | deref() # reading all files and summing them together None | applyCl.aS(lambda: ls(f"{dir_}/applyCl")) | ungroup() | applyCl(cat(text=False) | aS(dill.loads), pre=True) | cut(1) | toSum() .. admonition:: Simple mode There's also another mode that's activated whenever f is not specified that feels more like vanilla :class:`~k1lib.cli.inp.cat`. Say you have a file on a specific node:: nodeId = "7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d" fn = "~/ssd2/randomFile.txt" # -------------- file is on current node -------------- cat(fn) # returns iterator of lines inside the file fn | cat() # same thing as above # -------------- file is on remote node -------------- [nodeId, fn] | applyCl.cat() # returns iterator of lines of the file applyCl.cat([nodeId, fn]) # same thing nodeId | applyCl.cat(fn) # also same thing So yeah, there're lots of ways to just simply read a file on a remote node. Is it too much? Probably, but good thing is that you can pick any that's intuitive for you. Note that this mode is just for convenience only, for when you want to do exploratory analysis on a single remote file. To be efficient at bulk processing, use the normal mode instead. :param fn: file name :param f: function to execute in every process :param nodeIds: only read file from these nodes :param timeout: kills the processes if it takes longer than this amount of seconds :param pre: "preserve" mode, just like in :class:`applyCl`. Whether to keep the node id column or not :param multiplier: by default, each node will spawn as many process as there are cpus. Sometimes you want to spawn more process, change this to a higher number :param includeId: includes a unique id for this process (just normal integers from 0 to n) :param resolve: whether to resolve the remote objects or not """ # applyCl fn = os.path.expanduser(fn) if fn is not None else None # applyCl if f is None: # simple case # applyCl def inner(nodeId_fn:Tuple[str, str]): # applyCl nodeId, fn = nodeId_fn; seeks = [nodeId] | applyCl.aS(lambda: fn | cli.splitSeek(round(os.path.getsize(fn)/settings.cat.chunkSize+1))) | cli.cut(1) | cli.item() | cli.deref() # applyCl inter = seeks | cli.window(2) | apply(cli.wrapList() | cli.insert(nodeId)) | cli.deref() # applyCl return inter | ~applyCl(lambda sB, eB: cli.cat(fn,sB=sB,eB=eB) | cli.deref(), pre=True) | cli.cut(1) | cli.joinStreams() # applyCl # return [nodeId_fn] | applyCl(cat() | deref(), pre=True) | cut(1) | item() # direct, no chunking method # applyCl if fn is None: return aS(inner) # [nodeId, fn] | applyCl.cat() # applyCl if isinstance(fn, str): return aS(lambda nodeId: inner([nodeId, fn])) # nodeId | applyCl.cat() # applyCl else: return inner(fn) # applyCl.cat([nodeId, fn]) # applyCl nodeIds = nodeIds or (applyCl.nodeIds() | applyCl.aS(lambda: os.path.exists(fn)) | cli.filt(cli.op(), 1) | cli.cut(0) | cli.deref()) # applyCl checkpoints = nodeIds | applyCl.aS(lambda: fn | cli.splitSeek(int(applyCl.meta()["Resources"]["CPU"]*multiplier)) | cli.window(2) | cli.deref()) | cli.ungroup() | cli.insertIdColumn(True, False) | ~apply(lambda x,y,z: [x,[*y,z]]) | cli.deref() # applyCl return checkpoints | applyCl(~aS(lambda x,y,idx: cli.cat(fn, sB=x, eB=y) | ((cli.wrapList() | cli.insert(idx)) if includeId else cli.iden()) | f), pre=True, timeout=timeout, num_cpus=1, resolve=resolve) | (cli.iden() if pre else cli.cut(1)) # applyCl
[docs] @staticmethod # applyCl def replicateFolder(folder:str, nodeIds=None): # applyCl """Replicates a specific folder in the current node to all the other nodes. Example:: applyCl.replicateFolder("~/ssd2/data/owl") This just list out all files recursively in the specified folder, then replicate each file using :meth:`replicateFile`""" # applyCl applyCl.getFilesInFolder(folder) | applyCl(lambda fn: applyCl.replicateFile(fn, nodeIds), num_cpus=0.1) | cli.deref() # applyCl
[docs] @staticmethod # applyCl def balanceFolder(folder:str, maxSteps:int=None, audit:bool=False, bs:int=5, chunkSize:int=100_000_000): # applyCl """Balances all files within a folder across all nodes. Example:: # make the chunk size huge so that transfers become faster settings.cli.cat.chunkSize = 100_000_000 base = "~/repos/labs/k1lib/k1lib/cli/test/applyCl.balance" # deletes old structures and making test folder applyCl.cmd(f"rm -r {base}"); applyCl.cmd(f"mkdir -p {base}") # creates 20 files of different sizes and dump it in the base folder of the current node torch.linspace(1e4, 1e5, 20).int() | apply(lambda x: "x"*x) | insertIdColumn() | ~apply(lambda idx, contents: contents | file(f"{base}/{idx}.txt")) | deref(); # transfers files between nodes such that the total folder size is proportional to the number of cpus across nodes applyCl.balanceFolder(base) # get folder size of all nodes None | applyCl.aS(lambda: ls(base) | apply(os.path.getsize) | toSum()) | deref() # creates 20 additional files and dump it to the current node torch.linspace(1e4, 1e5, 20).int() | apply(lambda x: "x"*x) | insertIdColumn() | ~apply(lambda idx, contents: contents | file(f"{base}/{idx+20}.txt")) | deref(); # balances the tree out again applyCl.balance(base) # get folder size of all nodes None | applyCl.aS(lambda: ls(base) | apply(os.path.getsize) | toSum()) | deref() So imagine that you just downloaded 1000 files to a single node on a specific folder, but you need to analyze all of them in a distributed manner. What you can do is to move some files to other nodes and then do your analysis. If you want to download more files, just dump it to any node (or download distributed across all nodes), then rebalance the folders and do your analysis. Also, internally, it splits files into multiple chunks, transfer the chunks to other nodes and append to the correct files. It uses :meth:`~k1lib.cli.inp.cat` to split up the file, which has settings under ``settings.cli.cat``. By default, the chunk size is 100k bytes, which I think is the sweet spot because :meth:`~k1lib.cli.inp.cat` also supports remote file accessed from the internet and sometimes the library is used for systems with very few memory. But for this use case where you already have the insane hardware for this, 100kB is extremely small and will slow transfer rates to a crawl, so in this function, it will be temporarily be set to the parameter ``ChunkSize``, which is 100MB by default. :param folder: folder to rebalance all of the files :param maxSteps: what's the maximum number of file transfers? By default has no limit, so that files are transferred until :param audit: if True, don't actually move files around and just return what files are going to be moved where :param bs: batch size for transporting this many files at once. Increase to make it faster, but with the penalty of the progress bar not updating as frequently :param chunkSize: file chunk size to split up and send to other nodes """ # applyCl from k1lib.cli._applyCl import balanceFolder # applyCl with settings.cat.context(chunkSize=chunkSize): return balanceFolder(folder, audit, maxSteps, bs=bs) # applyCl
[docs] def decommissionFolder(folder:str, nAs:List[str], maxSteps:int=10000, audit:bool=False, timeout:float=3600, bs:int=5, chunkSize:int=100_000_000): # applyCl """Like :meth:`decommissionFile`, but works for distributed folders instead. :param nAs: list of node ids to migrate files away from :param maxSteps: limits the total number of optimization steps. Normally don't have to specify, but just here in case it runs for too long trying to optimize the folder structure :param audit: if True, just returns the file movements it's planning to do :param bs: batch size for transporting this many files at once. Increase to make it faster, but with the penalty of the progress bar not updating as frequently :param chunkSize: see :meth:`balanceFolder` """ # applyCl from k1lib.cli._applyCl import decommissionFolder # applyCl with settings.cat.context(chunkSize=chunkSize): return decommissionFolder(folder, nAs, audit=audit, maxSteps=maxSteps, timeout=timeout, bs=bs) # applyCl
[docs] @staticmethod # applyCl def pruneFolder(folder): # applyCl """Removes empty directories recursively from a root folder.""" # applyCl def inner(folder): # applyCl folder = os.path.expanduser(folder) # applyCl dirs, files = folder | ls() | filt(os.path.isdir).split() | deref() # applyCl if len(files) > 0: return # applyCl dirs | apply(pruneFolder) | ignore() # applyCl if folder | ls() | shape(0) == 0: None | cmd(f"rm -rf {folder}") | ignore() # applyCl None | applyCl.aS(lambda: inner(folder)) | deref() # applyCl
[docs] @staticmethod # applyCl def diskScan(folder:str, raw=False, accurate=True, f=None): # applyCl """Scans for files and folders in the specified folder for potential distributed files and folders. A distributed file is a file that exists on more than 1 node. A distributed folder is a folder that that exists on more than 1 node and does not have any shared children. Example:: applyCl.diskScan("~/ssd2") applyCl.diskScan("~/ssd2", True) The first line does not return anything, but will print out something like this: .. include:: ../literals/diskScan.rst While the second line will return a parseable data structure instead:: [[['/home/kelvin/ssd2/data/genome/RegulationFeatureActivity', [4113489746, 7912834090, 4164314316]], ['/home/kelvin/ssd2/data/genome/go/release_geneontology_org', [2071645117, 4172737915, 2107005131]], ['/home/kelvin/ssd2/data/genome/RegulationFeatureActivity.backup', [568878496, 552888466, 600610083]], ['/home/kelvin/ssd2/data/genome/00-common_all.idx', [341738564, 671136833, 0]], ['/home/kelvin/ssd2/data/genome/genbank/ch1.dat.gz', [25356744, 0, 25356764]], ['/home/kelvin/ssd2/test', [136152, 273530, 136351]], ['/home/kelvin/ssd2/data/genome/genbank/ch1', [0, 0, 0]]], [['/home/kelvin/ssd2/data/genome/dummy.txt', [1101, 1101, 1101]]], [['/home/kelvin/ssd2/data/genome/00-All.vcf', [32737509360, 65475018903, 32737509588]], ['/home/kelvin/ssd2/data/genome/MotifFeatures/homo_sapiens.GRCh38.motif_features.gff', [13963854962, 27927709895, 13963854962]], ['/home/kelvin/ssd2/data/genome/00-common_all.vcf', [2353901811, 4707803470, 2353901831]]]] Remember that since an operating system usually have lots of shared files (like "~/.bashrc", for example), these might be mistaken as a distributed file. Make sure to only scan folders that you store data in, or else it'll take a long time to return. :param folder: the folder to scan through :param raw: whether to return raw data or display it out nicely :param accurate: if True, returns size when you read all files into RAM. If False returns size occupied by the entire file/folder (will be larger because files are arranged into different blocks in the underlying disk) :param f: optional post process function applied after getting the raw results, if ``raw=False``""" # applyCl from k1lib.cli._applyCl import diskScan4, diskScan5 # applyCl if raw: return diskScan4(folder, accurate=accurate) # applyCl else: return diskScan5(folder, accurate=accurate, f=(f or cli.iden())) # applyCl
[docs] @staticmethod # applyCl def balancedNodeIds(): # applyCl """Returns a stream of node ids that's balanced based on cpu count/performance. Example:: # returns list of 10 node ids: ["abc...", "def...", "abc...", ...] applyCl.balancedNodeIds() | head() | deref() """ # applyCl from k1lib.cli._applyCl import balancedNodeIds # applyCl return balancedNodeIds() # applyCl
[docs] @staticmethod # applyCl def balancedCpus(): # applyCl """Returns Dict[nodeId (str) -> #cpu (int))]. Could be useful to know how much to split up files and folders according to your custom rules. Example:: # returns {"abc...": 8, "def...": 19, "ghi...": 88} for 7700 (4c8t), 10700k (8c16t) and 13900k (24c32t) applyCl.balancedCpus() """ # applyCl from k1lib.cli._applyCl import loadTestGuard # applyCl return loadTestGuard() # applyCl
[docs] @staticmethod # applyCl def loadTest(): # applyCl """Performs a load test on the cluster. Example:: applyCl.loadTest() What is a load test? It basically tries to perform some intensive and long-running calculations on all processes on all nodes in the cluster to know how good are each individual nodes. This is useful information because whenever you try to split a file up to form a distributed file, or move files in a folder around to form a distributed folder, the amount of data each node gets is going to be proportional to this performance information. More powerful nodes will have more data to process, so that the total running time across all nodes is going to roughly be the same. But isn't cpu count good enough for this? No, not actually. The i7 7700 has 4 cores, 8 threads, and the i9 13900k has 8 performance cores and 16 efficiency cores, totalling to 32 threads. You would suspect that the 13900k to be 4x (32/8=4) or 6x (24/4=6) more powerful than the 7700, but it's actually 10x more powerful. The test itself takes around 1-2 minutes, and the test results are going to be saved locally in the folder "~/.k1lib/", so that it can use that info directly in future runs.""" # applyCl from k1lib.cli._applyCl import loadTest # applyCl return loadTest() # applyCl
[docs] @staticmethod # applyCl def getFolderSize(folder:str=None) -> int: # applyCl """Shortcut function to get size of a folder on the current node.""" # applyCl from k1lib.cli._applyCl import getFolderSize # applyCl if folder is None: return getFolderSize # applyCl return folder | getFolderSize # applyCl
[docs] @staticmethod # applyCl def getFilesInFolder(folder:str=None): # applyCl from k1lib.cli._applyCl import getFilesInFolder # applyCl if folder is None: return getFilesInFolder # applyCl return folder | getFilesInFolder # applyCl
if hasRay: # applyCl @ray.remote(num_cpus=0) # applyCl class Storage: # a wrapper for specific objects, kinda like ObjectRef, but it's an ObjectRef in my control. Should not be serialized to every other place # applyCl def __init__(self, v=None): # applyCl self.lockExecute = threading.Lock(); self.lockIncref = threading.Lock(); self.lockDecref = threading.Lock() # applyCl self.nodeId = applyCl.nodeId(); self.d = {}; self.refs = defaultdict(lambda: 0) # applyCl self.autoInc = k1lib.AutoIncrement(prefix="_d") # applyCl self.idx = f"{self.nodeId}_" + f"{random.random()}"[:10] + f"_{time.time()}" # applyCl self.idx2 = f"{random.randint(100, 900)}" # applyCl self.deposit(v) # applyCl def getIdx(self): return self.idx # applyCl def getMeta(self): return [self.nodeId, self.idx, self.idx2] # applyCl def lookup(self, idx:str): return self.d[idx] # applyCl def remove(self, idx:str): del self.d[idx] # applyCl def keys(self): return list(self.d.keys()) # applyCl def incref(self, idx:str): # applyCl # with self.lockIncref: # applyCl if idx is not None: # applyCl # requests.get(f"http://192.168.1.133:8892/increfd/{self.idx2}/{idx}") # applyCl if self.refs[idx] <= 0: raise Exception(f"incref-ing object {self.idx2}/{idx} that has already been deleted") # applyCl self.refs[idx] += 1#; return self # applyCl def decref(self, idx:str): # applyCl # with self.lockDecref: # applyCl if idx is not None: # applyCl self.refs[idx] -= 1; v = self.d[idx] # applyCl if self.refs[idx] == 0: self.remove(idx) # applyCl if self.refs[idx] < 0: raise Exception(f"decref-ing object {self.idx2}/{idx} that no longer exists") # applyCl # requests.get(f"http://192.168.1.133:8892/decrefd/{self.idx2}/{idx}")#; return self # applyCl def deposit(self, v:"1/or1/h1", s:"Storage"=None) -> "idx:str": # injecting s into v if v is a Handle # applyCl if isinstance(v, Handle): v.setStorage(s); v = v.get() # applyCl if isinstance(v, ray.ObjectRef): v = ray.get(v) # applyCl idx = self.autoInc() # applyCl # requests.get(f"http://192.168.1.133:8892/deposit/{self.idx2}/{idx}/{v}"[:100]) # applyCl self.d[idx] = v; self.refs[idx] += 1; return idx # applyCl def execute(self, f, idx:str, idxCtxHandle:str=None) -> "idx:str": # applyCl # requests.get(f"http://192.168.1.133:8892/execute/{self.idx2}/{idx}") # applyCl # if idxCtxHandle: f.__globals__["ctxHandle"] = self.d[idxCtxHandle] # injecting in global variable # applyCl with self.lockExecute: return self.deposit(f(self, self.d[idx], idxCtxHandle)) # executing "pure" f with some argument taken from the storage # applyCl def __getstate__(self): raise Exception("Can't be serialized!") # applyCl else: # applyCl class Storage: pass # applyCl _storages = {} # nodeId -> {idx: int, ss: [Storage]}, idx for current index to yield the storage # applyCl _cpuCounts = {} # nodeId -> int # applyCl _nodeIdsGen = None # applyCl def getStorage(nodeId:str=None, num_gpus=0) -> Storage: # getStorage """Handles creating storage contexts. This is created mainly because it's costly to actually instantiate a new actor (1-2 second/actor!), so this will spawn new storage contexts till the cpu count of each node is reached. From then on, it will keep reusing old storage contexts""" # getStorage global _nodeIdsGen; _nodeIdsGen = _nodeIdsGen or applyCl.balancedNodeIds() # getStorage nodeId = nodeId or next(_nodeIdsGen) # getStorage if num_gpus == 0: # getStorage if nodeId not in _storages or nodeId not in _cpuCounts: # getStorage _storages[nodeId] = {"idx": 0, "ss": []} # getStorage _cpuCounts[nodeId] = nodeId | applyCl.aS(lambda: os.cpu_count()) # getStorage if len(_storages[nodeId]["ss"]) < _cpuCounts[nodeId]: # add new storage context # getStorage _storages[nodeId]["ss"].append(specificNode(Storage, nodeId).remote()) # getStorage idx = _storages[nodeId]["idx"]; res = _storages[nodeId]["ss"][idx] # getStorage _storages[nodeId]["idx"] = (idx+1)%_cpuCounts[nodeId]; return res # getStorage else: return specificNode(Storage, nodeId, num_gpus).remote() # getStorage def extractStorage(x): return x.storage if isinstance(x, Handle) else None # extractStorage class Handle: # specific object in storage, pretty much ObjectRef that I'm in control of. Can be serialized to every other place # Handle def __init__(self, storage, idx:str=None, _or=None): # Handle self.storage = storage; self.idx = idx; self._or = _or # Handle self.nodeId, self.storageId, self.idx2 = ray.get(storage.getMeta.remote()); self.weakref = False # whether this Handle should decrement reference count of storage element or not # Handle @staticmethod # Handle def create(v, nodeId:str=None, num_gpus=0) -> "Handle": # creates new storage and put value into it # Handle s = getStorage(nodeId, num_gpus); return Handle(s, ray.get(s.deposit.remote(v, extractStorage(v)))) # Handle def deposit(self, v) -> "Handle": # put new value into this handle's Storage # Handle return Handle(self.storage, ray.get(self.storage.deposit.remote(v, extractStorage(v)))) # Handle def execute(self, f, s:"Storage"=None, kwargs=None) -> "Handle": # f is a normal function. Blocks until finished executing # Handle if kwargs: # Handle kwargs.pop("num_gpus", None) # Handle @ray.remote(**kwargs) # Handle def inner(sto, s): return sto.execute.remote(f, self.idx) # Handle return Handle(self.storage, ray.get(ray.get(inner.remote(self.storage, s)))) # Handle else: return Handle(self.storage, ray.get(self.storage.execute.remote(f, self.idx))) # Handle def block(self) -> "Handle": # block execution and finalize Handle's state # Handle if self._or: self.idx = ray.get(self._or) # Handle return self # Handle def executeAsync(self, f, kwargs=None, idxCtxHandle:str=None) -> "Handle": # f is a normal function. Returns immediately, finalize it by calling .block() # Handle # requests.get(f"https://logs.mlexps.com/async12/{self.idx}/N/{self.idx2}") # Handle if kwargs: # idxCtxHandle is the (optional) string of the background handle object (left arg in pre=True mode), to be dynamically injected into the "ctxHandle" global variable # Handle # i1 = self.idx; i2 = self.idx2 # Handle kwargs.pop("num_gpus", None) # Handle @ray.remote(**kwargs) # Handle def inner(sto): # Handle # requests.get(f"https://logs.mlexps.com/async56/{i1}/N/{i2}") # Handle return ray.get(sto.execute.remote(f, self.idx, idxCtxHandle)) # Handle # requests.get(f"https://logs.mlexps.com/async34/{self.idx}/N/{self.idx2}") # Handle return Handle(self.storage, _or=inner.remote(self.storage)) # Handle else: return Handle(self.storage, _or=self.storage.execute.remote(f, self.idx, idxCtxHandle)) # Handle def get(self): return ray.get(self.storage.lookup.remote(self.idx)) # Handle def setStorage(self, s): # inject storage dependency into this handle, increment storage's refcount # Handle if s: self.storage = s; self.storage.incref.remote(self.idx); self.weakref = False # Handle def __repr__(self): return f"<Handle idx={self.idx} storage={self.storageId}>" # Handle def __getstate__(self): d = dict(self.__dict__); d["storage"] = None; return d # Handle def __setstate__(self, d): self.__dict__.update(d); self.weakref = True # reconstructed Handles don't decrement reference count of variable, because Actors can't be serialized # Handle # def report(self, s): requests.get(f"http://192.168.1.133:8892/{s}/{self.idx2}/{self.idx}") # Handle def __del__(self): # Handle # requests.get(f"http://192.168.1.133:8892/handdel/{self.idx2}/{self.idx}") # Handle # print(f"storage: {self.storage}, nodeId: {self.nodeId}") # Handle if not self.weakref: self.storage.decref.remote(self.idx) # Handle @lru_cache # Handle def storageWarmup(): print("Warming up distributed storage..."); None | applyCl.aS(lambda: os.cpu_count()) | ~apply(lambda x,y: [x]*y) | cli.joinSt() | apply(getStorage) | cli.ignore(); print("Finished warming up") # storageWarmup def storageSize(): return _storages.values() | cli.op()["ss"].all() | cli.joinSt() | apply(lambda s: ray.get(s.keys.remote())) | cli.joinSt() | cli.shape(0) # storageSize thEmptySentinel = object() # storageSize
[docs]class applyTh(BaseCli): # applyTh blurb="Applies a function to all input elements across multiple threads" # applyTh
[docs] def __init__(self, f, prefetch:int=None, timeout:float=5, bs:int=1, sync=True, **kwargs): # applyTh """Kinda like the same as :class:`applyMp`, but executes ``f`` on multiple threads, instead of on multiple processes. Advantages: - Relatively low overhead for thread creation - Fast, if ``f`` is io-bound - Does not have to serialize and deserialize the result, meaning iterators can be exchanged Disadvantages: - Still has thread creation overhead, so it's still recommended to specify ``bs`` - Is slow if ``f`` has to obtain the GIL to be able to do anything All examples from :class:`applyMp` should work perfectly here. :param prefetch: how many results to execute ahead of time :param timeout: kills the thread if it takes longer than this amount :param bs: how much to bunch function calls together :param sync: if True, execute the functions, hang and wait for the result of each operation, then return. Else schedules the thread but does not wait for the result and yields None right away :param kwargs: keyword arguments to be fed to the function""" # applyTh fs = [f]; super().__init__(fs=fs); self.f = fs[0]; self.kwargs = kwargs # applyTh self.prefetch = prefetch or int(1e9); self.timeout = timeout; self.bs = bs; self.sync = sync # applyTh
[docs] def __ror__(self, it): # applyTh if self.bs > 1: # applyTh yield from (it | cli.batched(self.bs, True) | applyTh(apply(self.f), self.prefetch, self.timeout, sync=self.sync) | cli.joinStreams()); return # applyTh datas = deque(); it = iter(it); kwargs = self.kwargs; sync = self.sync # applyTh innerF = fastF(self.f); timeout = self.timeout # applyTh def f(line, wrapper): wrapper.value = innerF(line, **kwargs) # applyTh for _, line in zip(range(self.prefetch), it): # applyTh w = k1lib.Wrapper(thEmptySentinel) # applyTh t = threading.Thread(target=f, args=(line,w)) # applyTh t.start(); datas.append((t, w)) # applyTh for line in it: # applyTh data = datas.popleft() # pop before checking sync to clean up memory # applyTh if sync: # applyTh data[0].join(timeout) # applyTh if data[1].value is thEmptySentinel: # applyTh for data in datas: data[0].join(0.01) # applyTh raise RuntimeError("Thread timed out!") # applyTh yield data[1].value # applyTh else: yield None # applyTh w = k1lib.Wrapper(thEmptySentinel) # applyTh t = threading.Thread(target=f, args=(line,w)) # applyTh t.start(); datas.append((t, w)) # applyTh for i in range(len(datas)): # do it this way so that python can remove threads early, due to ref counting # applyTh data = datas.popleft() # applyTh if sync: # applyTh data[0].join(timeout) # applyTh if data[1].value is thEmptySentinel: # applyTh for data in datas: data[0].join(0.01) # applyTh raise RuntimeError("Thread timed out!") # applyTh yield data[1].value # applyTh else: yield None # applyTh
def _copy(self): return applyTh(self.f, self.prefetch, self.timeout, self.bs, **self.kwargs) # applyTh
[docs] def __invert__(self): # applyTh res = self._copy(); f = fastF(res.f) # applyTh kw = res.kwargs # applyTh res.f = lambda x: f(*x, **kw) # applyTh res.kwargs = {} # applyTh return res # applyTh
[docs]class applySerial(BaseCli): # applySerial blurb="Applies a function to an element repeatedly" # applySerial
[docs] def __init__(self, f, *args, **kwargs): # applySerial """Applies a function repeatedly. First yields input iterator ``x``. Then yields ``f(x)``, then ``f(f(x))``, then ``f(f(f(x)))`` and so on. Example:: # returns [2, 4, 8, 16, 32] 2 | applySerial(op()*2) | head(5) | deref() If the result of your operation is an iterator, you might want to :class:`~k1lib.cli.utils.deref` it, like this:: rs = iter(range(8)) | applySerial(rows()[::2]) # returns [0, 2, 4, 6] rs | rows(1) | item() | deref() # returns []. This is because all the elements are taken by the previous deref() rs | item() | deref() # returns [[2, 8], [10, -6], [4, 16], [20, -12]] [2, 8] | ~applySerial(lambda a, b: (a + b, a - b)) | head(4) | deref() rs = iter(range(8)) | applySerial(rows()[::2] | deref()) # returns [0, 2, 4, 6] rs | rows(1) | item() # returns [0, 4] rs | item() # or `next(rs)` # returns [0] rs | item() # or `next(rs)` :param f: function to apply repeatedly""" # applySerial fs = [f]; super().__init__(fs=fs); self.f = fs[0] # applySerial self.unpack = False; self.args = args; self.kwargs = kwargs # applySerial
[docs] def __ror__(self, it): # applySerial f = fastF(self.f) # applySerial if self.unpack: # applySerial it = init.dfGuard(it) # applySerial while True: yield it; it = f(*it, *self.args, **self.kwargs) # applySerial else: # applySerial while True: yield it; it = f(it, *self.args, **self.kwargs) # applySerial
[docs] def __invert__(self): # applySerial ans = applySerial(self.f, *self.args, **self.kwargs) # applySerial ans.unpack = True; return ans # applySerial
def argsort(it, key=None, reverse=False): # argsort if isinstance(it, settings.arrayTypes): return np.argsort(it) # this mode ignores key and reverse! # argsort if key: return sorted(range(len(it)), key=lambda i: key(it[i]), reverse=reverse) # argsort else: return sorted(range(len(it)), key=it.__getitem__, reverse=reverse) # argsort
[docs]class sort(BaseCli): # sort blurb="Sorts list/table based on an optional column" # sort
[docs] def __init__(self, column:int=0, numeric=True, reverse=False, unsort=False): # sort """Sorts list/table based on a specific `column`. Example:: # returns [[5, 'a'], [1, 'b']] [[1, "b"], [5, "a"]] | ~sort(0) | deref() # returns [[2, 3]] [[1, "b"], [5, "a"], [2, 3]] | ~sort(1) | deref() # errors out, as you can't really compare str with int [[1, "b"], [2, 3], [5, "a"]] | sort(1, False) | deref() # returns [-1, 2, 3, 5, 8] [2, 5, 3, -1, 8] | sort(None) | deref() .. admonition:: unsort This is how it works:: a = np.array([1, 5, 9, 2, 6, 3, 7, 4, 8]) # returns np.array([1, 5, 9, 2, 6, 3, 7, 4, 8]) a | sort(None, unsort=True) # returns np.array([1, 2, 3, 4, 5, 6, 7, 8, 9]), normal sort a | sort(None) # returns np.array([-3.5, 0.5, 4.5, -2.5, 1.5, -1.5, 2.5, -0.5, 3.5]), sorts, do transformation, then unsort a | (sort(None, unsort=True) | aS(lambda x: x - x[-1]/2)) # returns np.array([12.25, 0.25, 20.25, 6.25, 2.25, 2.25, 6.25, 0.25, 12.25]) a | (sort(None, unsort=True) | aS(lambda x: (x - x[-1]/2)**2)) How this works is that it will sort everything as usual, then it'll execute the captured transformation and then it will unsort everything. This is for scenarios when an operation needs to operate on sorted data, but you still want to keep the original ordering for some reason. :param column: if None, sort rows based on themselves and not an element :param numeric: whether to convert column to float :param reverse: False for smaller to bigger, True for bigger to smaller. Use :meth:`__invert__` to quickly reverse the order instead of using this param :param unsort: whether to sort and then unsort the input or not""" # sort super().__init__(capture=True) # sort self.column = column; self.reverse = reverse; self.numeric = numeric; self.unsort = unsort # sort self.filterF = (lambda x: float(x)) if numeric else (lambda x: str(x)) # sort
def _all_array_opt(self, it, level): # sort if self.unsort: return NotImplemented # too complex to think about right now # sort c = self.column; reverse = self.reverse; p = [slice(None, None, None)]*level; p1 = (*p, slice(None, None, None)); ser = self.capturedSerial # sort if c is None and len(it.shape)-level != 1: raise Exception(f"Expected sort(None) to take in 1-d array, but the array has shape {it.shape[level:]}") # sort if c is not None and len(it.shape)-level != 2: raise Exception(f"Expected sort(None) to take in 2-d array, but the array has shape {it.shape[level:]}") # sort bm = np if isinstance(it, np.ndarray) else (torch if (hasTorch and isinstance(it, torch.Tensor)) else None) # sort if bm is not None: # sort if c is None: b = bm.argsort(it); b = bm.flip(b, (level,)) if reverse else b; return bm.gather(it, level, b) | ser.all(level) # sort else: b = bm.argsort(it[(*p1, c)]); b = bm.flip(b, (level,)) if reverse else b; return bm.gather(it, level, b[(*p1, None)].expand(it.shape)) | ser.all(level) # sort return NotImplemented # sort
[docs] def __ror__(self, it:Iterator[str]): # sort c = self.column; reverse = self.reverse; unsort = self.unsort; bm = None; ser = self.capturedSerial # sort if hasPandas: # sort if isinstance(it, pd.DataFrame): # sort if c is None: it = init.dfGuard(it) # sort else: # sort s = it[list(it)[c]]; arg = s.argsort(); arg = (arg[::-1] if reverse else arg).to_numpy(); res = it.iloc[arg] | ser # sort if unsort: return res.iloc[argsort(arg)] if isinstance(res, pd.DataFrame) else res | cli.rows(*argsort(arg)) # sort else: return res # sort elif isinstance(it, pd.core.arraylike.OpsMixin): it = it.to_numpy() # sort if isinstance(it, settings.arrayTypes): # sort if c is None and len(it.shape) != 1: raise Exception(f"Expected sort(None) to take in a 1-d array, but the array has shape {it.shape}") # sort if c is not None and len(it.shape) != 2: raise Exception(f"Expected sort(col) to take in a 2-d array, but the array has shape {it.shape}") # sort bm = np if isinstance(it, np.ndarray) else (torch if (hasTorch and isinstance(it, torch.Tensor)) else None) # sort if bm: # sort arg = bm.argsort(it) if c is None else bm.argsort(it[:,c]); arg = bm.flip(arg, (0,)) if reverse else arg # sort return it[arg] | ser | (cli.rows(*argsort(arg)) if self.unsort else cli.iden()) # sort f = self.filterF # sort rows = list(it) if c is None else list((it | cli.isNumeric(c) if self.numeric else it) | cli.apply(list)) # sort def sortF(row): # sort if len(row) > c: return f(row[c]) # sort return float("inf") # sort if self.unsort: # sort arg = argsort(rows, f if c is None else sortF, self.reverse) # sort return rows | cli.rows(*arg) | ser | cli.rows(*argsort(arg)) # sort return sorted(rows, key=f if c is None else sortF, reverse=self.reverse) | ser # sort
[docs] def __invert__(self): # sort """Creates a clone that has the opposite sort order""" # sort return sort(self.column, self.numeric, not self.reverse, self.unsort) # sort
def _jsF(self, meta): # sort fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); argIdx = init._jsDAuto() # sort if self.unsort: raise Exception("sort._jsF() unsort mode is unavailable, as the cli capture mechanism isn't possible in JS") # sort return f"{fIdx} = ({dataIdx}) => {dataIdx}.ksort({cli.kjs.v(self.column)}, {cli.kjs.v(self.numeric)}, {cli.kjs.v(self.reverse)})", fIdx # sort
[docs]class sortF(BaseCli): # sortF "Sorts list/table using a function" # sortF
[docs] def __init__(self, f:Callable[[Any], float], column:int=None, reverse=False): # sortF """Sorts list/table using a function. Example:: # returns ['a', 'aa', 'aaa', 'aaaa', 'aaaaa'] ["a", "aaa", "aaaaa", "aa", "aaaa"] | sortF(lambda r: len(r)) | deref() # returns ['aaaaa', 'aaaa', 'aaa', 'aa', 'a'] ["a", "aaa", "aaaaa", "aa", "aaaa"] | ~sortF(lambda r: len(r)) | deref()""" # sortF fs = [f]; super().__init__(fs=fs); self.f = fs[0]; self._fC = fastF(self.f) # sortF self.column = column; self.reverse = reverse # sortF
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # sortF c = self.column; f = self._fC # sortF if c is None: # sortF it = init.dfGuard(it) # sortF try: it[:]; len(it) # sortF except: it = list(it) # sortF return sorted(it, key=f, reverse=self.reverse) # sortF if hasPandas and isinstance(it, pd.DataFrame): # sortF col = it[list(it)[c]] # sortF arg = [x[0] for x in sorted([[i, f(x)] for i,x in enumerate(col)], key=lambda x:x[1], reverse=self.reverse)] # sortF return it.iloc[arg] # sortF def sortF(row): # sortF if len(row) > c: return f(row[c]) # sortF return float("inf") # sortF return sorted(list(it), key=sortF, reverse=self.reverse) # sortF
[docs] def __invert__(self) -> "sortF": # sortF return sortF(self.f, self.column, not self.reverse) # sortF
def _jsF(self, meta): # sortF fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); argIdx = init._jsDAuto() # sortF header, _fIdx, _async = k1lib.kast.asyncGuard(k1lib.kast.prepareFunc3(self.f, ("sortF", meta))) # sortF return f"{header}\n{fIdx} = {'async ' if _async else ''}({dataIdx}) => {{ return {'await ' if _async else ''}{dataIdx}.sortF{'_async' if _async else ''}(({argIdx}) => {_fIdx}({argIdx}), {cli.kjs.v(self.column)}, {cli.kjs.v(self.reverse)}); }}", fIdx # sortF
[docs]class consume(BaseCli): # consume blurb="Consumes the iterator in a side stream and returns the iterator" # consume
[docs] def __init__(self, f:Union[BaseCli, Callable[[Any], None]]): # consume r"""Consumes the iterator in a side stream and returns the iterator. Kinda like the bash command ``tee``. Example:: # prints "0\n1\n2" and returns [0, 1, 2] range(3) | consume(headOut()) | toList() # prints "range(0, 3)" and returns [0, 1, 2] range(3) | consume(lambda it: print(it)) | toList() This is useful whenever you want to mutate something, but don't want to include the function result into the main stream. See also: :class:`~k1lib.cli.output.tee`""" # consume fs = [f]; super().__init__(fs=fs); self.f = fs[0] # consume
[docs] def __ror__(self, it): # consume self.f(it); return it # consume
def batched_randperm(n, l, gen=None): # courtesy of https://discuss.pytorch.org/t/batch-version-of-torch-randperm/111121/3 # batched_randperm if gen is None: return np.argsort(np.random.rand(l, n)) # batched_randperm else: return torch.argsort(torch.rand(l, n, generator=gen), dim=-1) # batched_randperm def randperm(n, gen=None): return batched_randperm(n, 1, gen)[0] # randperm default = 100 # randperm
[docs]class randomize(BaseCli): # randomize blurb="Randomizes the input elements' order" # randomize
[docs] def __init__(self, bs=default, seed=None): # randomize """Randomize input stream. In order to be efficient, this does not convert the input iterator to a giant list and yield random values from that. Instead, this fetches ``bs`` items at a time, randomizes them, returns and fetch another ``bs`` items. If you want to do the giant list, then just pass in ``float("inf")``, or ``None``. Example:: # returns [0, 1, 2, 3, 4], effectively no randomize at all range(5) | randomize(1) | deref() # returns something like this: [1, 0, 2, 3, 5, 4, 6, 8, 7, 9]. You can clearly see the batches range(10) | randomize(3) | deref() # returns something like this: [7, 0, 5, 2, 4, 9, 6, 3, 1, 8] range(10) | randomize(float("inf")) | deref() # same as above range(10) | randomize(None) | deref() # returns True, as the seed is the same range(10) | randomize(seed=4) | deref() == range(10) | randomize(seed=4) | deref() Note that if ``seed=True``, then it will randomize all input iterators the same way and independently of each other. Meaning:: r = randomize(seed=42) range(10) | r | deref() # returns [6, 9, 1, 2, 0, 8, 3, 5, 4, 7] range(10) | r | deref() # also returns [6, 9, 1, 2, 0, 8, 3, 5, 4, 7] This may or may not be desireable, but I think it's desirable. :param bs: batch size :param seed: if specified, will always randomize the input iterator in the same way""" # randomize self.bs = bs if bs != None else float("inf") # randomize self.seed = seed # randomize if seed is not None and not hasTorch: raise Exception("Seeded randomize() depends on PyTorch. Please install it first") # randomize
def _newTorchGen(self): return torch.Generator().manual_seed(random.Random(self.seed).getrandbits(63)) # randomize def _newGenn(self): # randomize if self.seed is None: return randperm # randomize gen = self._newTorchGen(); return lambda n: randperm(n, gen) # randomize def _newGenn2(self): # randomize if self.seed is None: return batched_randperm # randomize gen = self._newTorchGen(); return lambda n, l: batched_randperm(n, l, gen) # randomize def _all_array_opt(self, it, level): # randomize perms = self._newGenn2()(it.shape[level], np.prod(it.shape[:level])) # randomize b = it | cli.joinSt(level-1); return b[np.arange(len(b))[:, None], perms].reshape(it.shape) # randomize
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # randomize bs = self.bs # randomize if isinstance(it, settings.arrayTypes): # randomize if bs is default or bs is None or len(it) <= bs: return it if len(it) == 1 else it[self._newGenn()(len(it))] # randomize if hasPandas and isinstance(it, pd.DataFrame): # randomize if bs is default or bs is None or len(it) <= bs: return it.iloc[self._newGenn()(len(it))] # randomize if bs is default: bs = 100 # randomize def gen(): # randomize genn = self._newGenn() # randomize for batch in it | cli.batched(bs, True): # randomize batch = list(batch); perms = genn(len(batch)) # randomize for idx in perms: yield batch[idx] # randomize return gen() # randomize
def _jsF(self, meta): # randomize fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # randomize return f"{fIdx} = ({dataIdx}) => {dataIdx}.randomize({cli.kjs.v(self.seed)})", fIdx # randomize
class StaggeredStream: # StaggeredStream def __init__(self, stream:Iterator[Any], every:int): # StaggeredStream """Not intended to be instantiated by the end user. Use :class:`stagger` instead.""" # StaggeredStream self.stream = stream; self.every = every # StaggeredStream def __iter__(self): # StaggeredStream for i, v in zip(range(self.every), self.stream): yield v # StaggeredStream def __len__(self): # StaggeredStream """Length of window (length of result if you were to deref it).""" # StaggeredStream return self.every # StaggeredStream
[docs]class stagger(BaseCli): # stagger blurb='Staggers input stream into multiple stream "windows" placed serially' # stagger
[docs] def __init__(self, every:int): # stagger """Staggers input stream into multiple stream "windows" placed serially. Best explained with an example:: o = range(10) | stagger(3) o | deref() # returns [0, 1, 2], 1st "window" o | deref() # returns [3, 4, 5], 2nd "window" o | deref() # returns [6, 7, 8] o | deref() # returns [9] o | deref() # returns [] This might be useful when you're constructing a data loader:: dataset = [range(20), range(30, 50)] | transpose() dl = dataset | batched(3) | (transpose() | toTensor()).all() | stagger(4) for epoch in range(3): for xb, yb in dl: # looping over a window print(epoch) # then something like: model(xb) The above code will print 6 lines. 4 of them is "0" (because we stagger every 4 batches), and xb's shape' will be (3,) (because we batched every 3 samples). You should also keep in mind that this doesn't really change the property of the stream itself. Essentially, treat these pairs of statement as being the same thing:: o = range(11, 100) # both returns 11 o | stagger(20) | item() o | item() # both returns [11, 12, ..., 20] o | head(10) | deref() o | stagger(20) | head(10) | deref() Lastly, multiple iterators might be getting values from the same stream window, meaning:: o = range(11, 100) | stagger(10) it1 = iter(o); it2 = iter(o) next(it1) # returns 11 next(it2) # returns 12 This may or may not be desirable. Also this should be obvious, but I want to mention this in case it's not clear to you.""" # stagger self.every = int(every) # stagger
[docs] def __ror__(self, it:Iterator[Any]) -> StaggeredStream: # stagger return StaggeredStream(iter(init.dfGuard(it)), self.every) # stagger
[docs] @staticmethod # stagger def tv(every:int, ratio:float=0.8): # stagger """Convenience method to quickly stagger train and valid datasets. Example:: # returns [[16], [4]] [range(100)]*2 | stagger.tv(20) | shape().all() | deref()""" # stagger return stagger(round(every*ratio)) + stagger(round(every*(1-ratio))) # stagger
compareOps = {"__lt__", "__le__", "__eq__", "__ne__", "__gt__", "__ge__"} # stagger
[docs]class op(k1lib.Absorber, BaseCli): # op blurb="Shorthand for lambda functions" # op
[docs] def __init__(self): # op """Absorbs operations done on it and applies it on the stream. Based on :class:`~k1lib.Absorber`. Example:: # returns 16 4 | op()**2 # returns 16, equivalent to the above 4 | aS(lambda x: x**2) # returns [0, 1, 4, 9, 16] range(5) | apply(op()**2) | deref() # returns [0, 1, 4, 9, 16], equivalent to the above range(5) | apply(lambda x: x**2) | deref() Main advantage is that you don't have to waste keystrokes when you just want to do a simple operation. How it works underneath is a little magical, so just treat it as a blackbox. A more complex example:: t = torch.tensor([[1, 2, 3], [4, 5, 6.0]]) # returns [torch.tensor([[4., 5., 6., 7., 8., 9.]])] [t] | (op() + 3).view(1, -1).all() | deref() Basically, you can treat ``op()`` as the input tensor. Tbh, you can do the same thing with this:: [t] | applyS(lambda t: (t+3).view(-1, 1)).all() | deref() But that's kinda long and may not be obvious. This can be surprisingly resilient, as you can still combine with other cli tools as usual, for example:: # returns [2, 3], demonstrating "&" operator torch.randn(2, 3) | (op().shape & iden()) | deref() | item() a = torch.tensor([[1, 2, 3], [7, 8, 9]]) # returns torch.tensor([4, 5, 6]), demonstrating "+" operator for clis and not clis (a | op() + 3 + iden() | item() == torch.tensor([4, 5, 6])).all() # returns [[3], [3]], demonstrating .all() and "|" serial chaining torch.randn(2, 3) | (op().shape.all() | deref()) # returns [[8, 18], [9, 19]], demonstrating you can treat `op()` as a regular function [range(10), range(10, 20)] | transpose() | filt(op() > 7, 0) | deref() # returns [3, 4, 5, 6, 7, 8, 9], demonstrating bounds comparison range(100) | filt(3 <= op() < 10) | deref() This can only deal with simple operations only. For complex operations, resort to the longer version ``aS(lambda x: ...)`` instead! There are also operations that are difficult to achieve, like ``len(op())``, as Python is expecting an integer output, so ``op()`` can't exactly take over. Instead, you have to use :class:`aS`, or do ``op().ab_len()``. Get a list of all of these special operations in the source of :class:`~k1lib.Absorber`. Performance-wise, in most cases, there are no degradation, so don't worry about it. Everything is pretty much on par with native lambdas:: n = 10_000_000 # takes 1.48s for i in range(n): i**2 # takes 1.89s, 1.28x worse than for loop range(n) | apply(lambda x: x**2) | ignore() # takes 1.86s, 1.26x worse than for loop range(n) | apply(op()**2) | ignore() # takes 1.86s range(n) | (op()**2).all() | ignore() More complex operations still retains the same speeds, as there's a JIT compiler embedded in:: # takes 2.15s for i in range(n): (i**2-3)*0.1 # takes 2.53s, 1.18x worse than for loop range(n) | apply(lambda x: (x**2-3)*0.1) | ignore() # takes 2.46s, 1.14x worse than for loop range(n) | apply((op()**2-3)*0.1) | ignore() Reserved operations that are not absorbed are: - all - __ror__ (__or__ still works!) - ab_solidify - op_hint""" # op super().__init__({"_hint": None}) # op
[docs] @staticmethod # op def solidify(f): # op """Static equivalent of ``a.ab_solidify()``. Example:: f = op()**2 f = op.solidify(f) If ``f`` is not an ``op``, then just return it without doing anything to it""" # op if f.__class__.__name__.split(".")[-1] == "op": f.ab_solidify() # op return f # op
[docs] def __ror__(self, it): # op return self.ab_operate(it) # op
def __or__(self, o): # op if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__or__(o) # op return super().__add__(o) # op def __add__(self, o): # op if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__add__(o) # op return super().__add__(o) # op def __and__(self, o): # op if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__and__(o) # op return super().__and__(o) # op def __call__(self, *args, **kwargs): # op if self._ab_solidified: return self.ab_operate(*args, **kwargs) # op return super().__call__(*args, **kwargs) # op def _typehint(self, inp): # op return self._hint if self._hint is not None else tAny() # op
[docs] def op_hint(self, _hint): # op """Specify output type hint""" # op self._ab_sentinel = True; self._hint = _hint # op self._ab_sentinel = False; return self # op
def _jsF(self, meta): return cli.aS(self)._jsF(meta) # op
cli.op = op # op
[docs]class integrate(BaseCli): # integrate blurb="Calculates the cumulative sum of the input" # integrate
[docs] def __init__(self, col=None, dt=1): # integrate """Integrates the input. Example:: # returns [0, 1, 3, 6, 10, 15, 21, 28, 36, 45] range(10) | integrate() | deref() # returns [0, 2, 6, 12, 20, 30, 42, 56, 72, 90] range(10) | integrate(dt=2) | deref() # returns [['a', 0], ['b', 1], ['c', 4], ['d', 10]] [["a", 0], ["b", 1], ["c", 3], ["d", 6]] | integrate(1) | deref() # returns [['a', 0], ['b', 2], ['c', 8], ['d', 20]] [["a", 0], ["b", 1], ["c", 3], ["d", 6]] | integrate(1, dt=2) | deref() :param col: column to integrate over :param dt: optional step size, or delta time""" # integrate self.col = col; self.dt = dt # integrate
[docs] def __ror__(self, it): # integrate it = init.dfGuard(it) # integrate if self.col is None: # integrate if self.dt == 1: # integrate s = 0 # integrate for e in it: s += e; yield s # integrate else: # integrate dt = self.dt; s = 0 # integrate for e in it: s += e*dt; yield s # integrate else: # integrate col = self.col # integrate if self.dt == 1: # integrate s = 0 # integrate for e in it: s += e[col]; e[col] = s; yield e # integrate else: # integrate dt = self.dt; s = 0 # integrate for e in it: s += e[col]*dt; e[col] = s; yield e # integrate
[docs]class roll(BaseCli): # roll blurb="Rolls the input by some amount of shift" # roll
[docs] def __init__(self, shift:int): # roll """Rolls the input some amount of shift. Example:: # returns [7, 8, 9, 0, 1, 2, 3, 4, 5, 6] range(10) | roll(3) :param shift: shift amount""" # roll self.shift = shift # roll
def _all_array_opt(self, it, level): # roll if isinstance(it, np.ndarray): return np.roll(it, self.shift, level) # roll if hasTorch and isinstance(it, torch.Tensor): return torch.roll(it, self.shift, level) # roll def _all_opt2(self): return NotImplemented # should not do this optimization, this is not dimension-agnostic! # roll
[docs] def __ror__(self, it): # roll shift = self.shift # roll if isinstance(it, np.ndarray): return np.roll(it, shift, 0) # roll if hasTorch and isinstance(it, torch.Tensor): return torch.roll(it, shift, 0) # roll if hasPandas: # roll arg = np.roll(np.arange(len(it)), shift, 0) # roll if isinstance(it, pd.DataFrame): return it.iloc[arg] # roll if isinstance(it, pd.core.arraylike.OpsMixin): return it[arg] # roll it = init.dfGuard(it) # roll try: it[0]; len(it) # roll except: it = list(it) # roll return [*it[-shift:], *it[:-shift]] # roll
[docs]class clamp(BaseCli): # clamp blurb="Clamps input list/array between 2 values" # clamp
[docs] def __init__(self, col=None, min=0, max=1, std=None): # clamp """Clamps input list/array between 2 values. Example:: # returns [3, 3, 3, 3, 4, 5, 6, 7, 7, 7] range(10) | clamp(None, 3, 7) | deref() # clamps the 1st column (0-index!) between 0 and 2 np.random.randn(10, 3) | clamp(1, 0, 2) This cli has 2 modes. Absolute mode and std mode. Absolute mode will clamp between .min and .max, and is activated when .std is left alone (aka None). Std mode is activated when you specify .std value. Then, it will calculate the mean and std of the incoming data and will auto calculate the min and max values. :param col: column to clamp""" # clamp self.col = col; self.min = min; self.max = max; self.std = std # clamp
def _all_array_opt(self, it, level): # this was pretty painful, I have to admit! # clamp n = len(it.shape); col = self.col; std = self.std # clamp if col is None: # clamp if self.std is not None: # clamp it = np.copy(it) if isinstance(it, np.ndarray) else torch.clone(it) # clamp b = it | cli.joinSt(n-level-1).all(level) # clamp min_ = (b.mean(level) - std*b.std(level)) | cli.repeat(b.shape[-1]).all(level) # clamp max_ = (b.mean(level) + std*b.std(level)) | cli.repeat(b.shape[-1]).all(level) # clamp b[b < min_] = min_[b < min_]; b[b > max_] = max_[b > max_]; return b.reshape(it.shape) # clamp else: # clamp if isinstance(it, np.ndarray): return np.clip(it, self.min, self.max) # clamp if hasTorch and isinstance(it, torch.Tensor): return torch.clamp(it, self.min, self.max) # clamp else: # clamp if self.std is not None: # clamp it = np.copy(it) if isinstance(it, np.ndarray) else torch.clone(it) # clamp b = it[(*[slice(None,None,None)]*level,col)] | cli.joinSt(n-level-2).all(level) # clamp min_ = (b.mean(level) - std*b.std(level)) | cli.repeat(b.shape[-1]).all(level) # clamp max_ = (b.mean(level) + std*b.std(level)) | cli.repeat(b.shape[-1]).all(level) # clamp b[b < min_] = min_[b < min_]; b[b > max_] = max_[b > max_]; return it # clamp else: # clamp it = np.copy(it) if isinstance(it, np.ndarray) else torch.clone(it) # clamp it[(*[slice(None,None,None)]*level,col)] = (np.clip if isinstance(it, np.ndarray) else torch.clamp)(it[(*[slice(None,None,None)]*level,col)], self.min, self.max); return it # clamp return NotImplemented # clamp
[docs] def __ror__(self, it): # clamp col = self.col; min_ = self.min; max_ = self.max; std = self.std # clamp if isinstance(it, np.ndarray): # clamp if col is None: # clamp if std is None: return np.clip(it, min_, max_) # clamp m = it.mean(); s = it.std(); return np.clip(it, m-s*std, m+s*std) # clamp else: # clamp a = np.copy(it); c = a[:,col] # clamp if std is None: a[:,col] = np.clip(c, min_, max_) # clamp else: m = c.mean(); s = c.std(); a[:,col] = np.clip(c, m-s*std, m+s*std) # clamp return a # clamp if hasTorch and isinstance(it, torch.Tensor): # clamp if col is None: # clamp if std is None: return torch.clamp(it, min_, max_) # clamp m = it.mean(); s = it.std(); return torch.clamp(it, m-s*std, m+s*std) # clamp else: # clamp a = torch.clone(it); c = a[:,col] # clamp if std is None: a[:,col] = torch.clamp(c, min_, max_) # clamp else: m = c.mean(); s = c.std(); a[:,col] = torch.clamp(c, m-s*std, m+s*std) # clamp return a # clamp if hasPandas and isinstance(it, pd.DataFrame): # clamp if col is None: it = init.dfGuard(it) # clamp else: # clamp c = it[list(it)[col]] # clamp if std is None: c = np.clip(c, min_, max_) # clamp else: m = c.mean(); s = c.std(); c = np.clip(c, m-s*std, m+s*std) # clamp return it.replaceCol(list(it)[col], c) # clamp return it | cli.apply(lambda x: max(min(x, max_), min_), self.col) # TODO: needs fixing # clamp