Source code for k1lib.cli.output

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
For operations that feel like the termination of operations
"""
from collections import defaultdict
from typing import Iterator, Any
from k1lib.cli.init import BaseCli; import k1lib.cli.init as init
import numbers, numpy as np, k1lib, tempfile, os, sys, time, math, json, re
from k1lib import cli; from k1lib.cli.typehint import *
plt = k1lib.dep.plt
try: import torch; hasTorch = True
except: hasTorch = False
__all__ = ["stdout", "tee", "file", "pretty", "unpretty", "display", "headOut",
           "intercept", "plotImgs"]
settings = k1lib.settings.cli
[docs]class stdout(BaseCli): # stdout
[docs] def __init__(self): # stdout """Prints out all lines. If not iterable, then print out the input raw. Example:: # prints out "0\\n1\\n2" range(3) | stdout() # same as above, but (maybe?) more familiar range(3) > stdout() This is rarely used alone. It's more common to use :meth:`headOut` for list of items, and :meth:`display` for tables.""" # stdout super().__init__() # stdout
def _typehint(self, inp): return None # stdout
[docs] def __ror__(self, it:Iterator[str]): # stdout try: # stdout it = iter(it) # stdout for line in it: print(line) # stdout except TypeError: print(it) # stdout
_defaultTeeF = lambda s: f"{s}\n" # stdout
[docs]class tee(BaseCli): # tee
[docs] def __init__(self, f=_defaultTeeF, s=None, every:int=1, delay:float=0): # tee """Like the Linux ``tee`` command, this prints the elements to another specified stream, while yielding the elements. Example:: # prints "0) 0\\n1) 1\\n2) 2\\n3) 3\\n4) 4\\n" and returns [0, 1, 4, 9, 16] range(5) | tee() | apply(op() ** 2) | deref() See also: :class:`~k1lib.cli.modifier.consume` This cli is not exactly well-thoughout and is a little janky :param f: element transform function. Defaults to just adding a new line at the end :param s: stream to write to. Defaults to :attr:`sys.stdout` :param every: only prints out 1 line in ``every`` lines, to limit print rate :param delay: if subsequent prints are less than this number of seconds apart then don't print them""" # tee self.s = s or sys.stdout; self.f = f; self.every = every; self.delay = delay # tee
[docs] def __ror__(self, it): # tee s = self.s; f = self.f; every = self.every; delay = self.delay # tee lastTime = 0 # tee for i, e in enumerate(init.dfGuard(it)): # tee if i % every == 0 and time.time()-lastTime > delay: # tee print(f" \r{i}) {f(e)}", end="", file=s); lastTime = time.time() # tee yield e # tee
[docs] def cr(self): # tee """Tee, but replaces the previous line. "cr" stands for carriage return. Example:: # prints "4" and returns [0, 1, 4, 9, 16]. Does print all the numbers in the middle, but is overriden range(5) | tee().cr() | apply(op() ** 2) | deref()""" # tee f = (lambda x: x) if self.f == _defaultTeeF else self.f # tee self.f = lambda s: f"{f(s)}"; return self # tee
[docs] def crt(self): # tee """Like :meth:`tee.cr`, but includes an elapsed time text at the end. Example:: range(5) | tee().cr() | apply(op() ** 2) | deref()""" # tee beginTime = time.time(); every = self.every; autoInc = k1lib.AutoIncrement() # tee f = (lambda x: x) if self.f == _defaultTeeF else self.f # tee self.f = lambda s: f"{f(s)}, {int(time.time() - beginTime)}s elapsed, throughput: {k1lib.fmt.throughput(autoInc()*every/(time.time() - beginTime))}"; return self # tee
[docs] def autoInc(self): # tee """Like :meth:`tee.crt`, but instead of printing the object, just print the current index and time""" # tee beginTime = time.time(); autoInc = k1lib.AutoIncrement(); every = self.every # tee self.f = lambda s: f"{autoInc()}, {int(time.time()-beginTime)}s elapsed, throughput: {k1lib.fmt.throughput(autoInc.value*every/(time.time() - beginTime))}"; return self # tee
try: # tee import PIL; hasPIL = True # tee except: hasPIL = False # tee
[docs]class file(BaseCli): # file
[docs] def __init__(self, fileName:str=None, flush:bool=False, mkdir:bool=False): # file """Opens a new file for writing. This will iterate through the iterator fed to it and put each element on a separate line. Example:: # writes "0\\n1\\n2\\n" to file range(3) | file("test/f.txt") # same as above, but (maybe?) more familiar range(3) > file("text/f.txt") # returns ['0', '1', '2'] cat("folder/f.txt") | deref() If the input is a string, then it will just put the string into the file and does not iterate through the string:: # writes "some text\\n123" to file, default iterator mode like above ["some text", "123"] | file("test/f.txt") # same as above, but this is a special case when it detects you're piping in a string "some text\\n123" | file("test/f.txt") If the input is a :class:`bytes` object or an iterator of :class:`bytes`, then it will open the file in binary mode and dumps the bytes in:: # writes bytes to file b'5643' | file("test/a.bin") [b'56', b'43'] >> file("test/a.bin") # returns ['56435643'] cat("test/a.bin") | deref() If the input is a :class:`PIL.Image.Image` object, then it will just save the image in the file:: # creates an random image and saves it to a file torch.randn(100, 200) | toImg() | file("a.png") Reminder that the image pixel range is expected to be from 0 to 255. You can create temporary files on the fly by not specifying a file name:: # creates temporary file url = range(3) > file() # returns ['0', '1', '2'] cat(url) | deref() This can be especially useful when integrating with shell scripts that wants to read in a file:: seq1 = "CCAAACCCCCCCTCCCCCGCTTC" seq2 = "CCAAACCCCCCCCTCCCCCCGCTTC" # use "needle" program to locally align 2 sequences None | cmd(f"needle {seq1 > file()} {seq2 > file()} -filter") You can also append to file with the ">>" operator:: url = range(3) > file() # appended to file range(10, 13) >> file(url) # returns ['0', '1', '2', '10', '11', '12'] cat(url) | deref() :param fileName: if not specified, create new temporary file and returns the url when pipes into it :param flush: whether to flush to file immediately after every iteration :param mkdir: whether to recursively make directories going to the file location or not""" # file super().__init__(); self.fileName = fileName; self.flush = flush; self.mkdir = mkdir # file self.append = False # whether to append to file rather than erasing it # file
[docs] def __ror__(self, it:Iterator[str]) -> None: # file super().__ror__(it); fileName = self.fileName; flushF = (lambda f: f.flush()) if self.flush else (lambda _: 0) # file if fileName is None: # file f = tempfile.NamedTemporaryFile() # file fileName = f.name; f.close() # file fileName = os.path.expanduser(fileName); firstLine = None # file if self.mkdir: os.makedirs(os.path.dirname(fileName), exist_ok=True) # file if hasPIL and isinstance(it, PIL.Image.Image): it.save(fileName); return fileName # file if isinstance(it, str): it = [it]; text = True # file elif isinstance(it, bytes): text = False # file else: # file it = iter(it); sentinel = object(); firstLine = next(it, sentinel) # file if firstLine is sentinel: # no elements at all # file with open(fileName, "w") as f: f.write("") # file return fileName # file text = not isinstance(firstLine, bytes) # file if text: # file with open(fileName, "a" if self.append else "w") as f: # file if firstLine is not None: f.write(f"{firstLine}\n") # file for line in it: f.write(f"{line}\n"); flushF(f) # file else: # file with open(fileName, "ab" if self.append else "wb") as f: # file if firstLine is not None: # file f.write(firstLine) # file for e in it: f.write(e); flushF(f) # file else: f.write(it) # file return fileName # file
def __rrshift__(self, it): # file self.append = True # why do this? because `a | b >> c` will be interpreted as `a | (b >> c)` # file if isinstance(it, BaseCli): return cli.serial(it, self) # file else: return self.__ror__(it) # file @property # file def name(self): # file """File name of this :class:`file`""" # file return self.fileName # file
[docs]class pretty(BaseCli): # pretty
[docs] def __init__(self, delim="", left=True): # pretty """Pretty-formats a table, or a list of tables. Example:: # These 2 statements are pretty much the same [range(10), range(10, 20)] | head(5) | pretty() > stdout() [range(10), range(10, 20)] | display() They both print:: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 This can also pretty-formats multiple tables:: [[range(10), range(10, 20)], [["abc", "defff"], ["1", "1234567"]]] | ~pretty() | joinStreams() | stdout() This will print:: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 abc defff 1 1234567 :param delim: delimiter between elements within a row. You might want to set it to "|" to create an artificial border or sth :param left: whether to left or right-align each element""" # pretty self.delim = delim; self.inverted = False; self.left = left # pretty
def _typehint(self, inp): return tIter(str) # pretty
[docs] def __ror__(self, it) -> Iterator[str]: # pretty inv = self.inverted; delim = self.delim; left = self.left; it = init.dfGuard(it) # pretty if inv: tables = [[list(i1) for i1 in i2] for i2 in it] # pretty else: tables = [[list(i1) for i1 in it]] # pretty widths = defaultdict(lambda: 0) # pretty for table in tables: # pretty for row in table: # pretty for i, e in enumerate(row): # pretty e = f"{e}"; row[i] = e # pretty widths[i] = max(len(e), widths[i]) # pretty def gen(table): # pretty if left: # pretty for row in table: yield delim.join(e.rstrip(" ").ljust(w+3) for w, e in zip(widths.values(), row)) # pretty else: # pretty for row in table: yield delim.join(e.rstrip(" ").rjust(w+3) for w, e in zip(widths.values(), row)) # pretty if inv: return tables | cli.apply(gen) # pretty else: return gen(tables[0]) # pretty
[docs] def __invert__(self): self.inverted = not self.inverted; return self # pretty
def _jsF(self, meta): # pretty fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # pretty return f"{fIdx} = ({dataIdx}) => {dataIdx}.pretty({json.dumps(self.delim)}, {cli.kjs.v(self.inverted)})", fIdx # pretty
[docs]def display(lines:int=10): # display """Convenience method for displaying a table. Pretty much equivalent to ``head() | pretty() | stdout()``. See also: :class:`pretty`""" # display f = pretty() | stdout() # display if lines is None: return f # display else: return cli.head(lines) | f # display
[docs]def headOut(lines:int=10): # headOut """Convenience method for head() | stdout()""" # headOut if lines is None: return stdout() # headOut else: return cli.head(lines) | stdout() # headOut
[docs]class unpretty(BaseCli): # unpretty
[docs] def __init__(self, ncols:int=None, left=True, headers=None): # unpretty """Takes in a stream of strings, assumes it's a table, and tries to split every line into multiple columns. Example:: # returns ['0 1 2 ', '3 4 5 ', '6 7 8 '] a = range(10) | batched(3) | pretty() | deref() # returns [['0 ', '1 ', '2 '], ['3 ', '4 ', '5 '], ['6 ', '7 ', '8 ']] a | unpretty(3) | deref() This cli will take the number of columns requested and try to split into a table by analyzing at what character column does it transition from a space to a non-space (left align), or from a non-space to a space (right align). Then the first ``ncols`` most popular transitions are selected. Sometimes this is not robust enough, may be some of your columns have lots of empty elements, then the transition counts will be skewed, making it split up at strange places. In those cases, you can specify the headers directly, like this:: # returns ['a b c ', '3 5 11 ', '4 6 7 '] a = [["a", 3, 4], ["b", 5, 6], ["c", 11, 7]] | transpose() | pretty() | deref() # returns [['a ', 'b ', 'c '], ['3 ', '5 ', '11 '], ['4 ', '6 ', '7 ']] a | unpretty(headers=["a", "b", "c"]) | deref() :param ncols: number of columns :param left: whether the data is left or right aligned :param header:""" # unpretty self.ncols = ncols; self.left = left; self.headers = headers # unpretty self.pat = re.compile(" [^ ]+") if left else re.compile("[^ ]+ ") # unpretty
[docs] def __ror__(self, it): # unpretty ncols = self.ncols; left = self.left; pat = self.pat; headers = self.headers # unpretty if headers is not None: ncols = len(headers) # unpretty if ncols < 1: raise Exception(f"Does not make sense to unpretty() into {ncols} columns") # unpretty if ncols == 1: return it # unpretty if headers is None: # unpretty try: len(it) # unpretty except: it = list(it) # unpretty splits = it | cli.head(10000) | (cli.apply(lambda x: (m.start()+1 for m in re.finditer(pat, x))) if left else cli.apply(lambda x: (m.end()-1 for m in re.finditer(pat, x))))\ | cli.joinSt() | cli.count() | ~cli.sort() | cli.cut(1) | cli.head(ncols-1) | cli.sort(None) | cli.aS(list) # unpretty else: # unpretty firstRow, it = it | cli.peek() # unpretty if it == []: return [] # unpretty splits = sorted([firstRow.find(h) for h in headers])[1:] # unpretty if ncols == 2: c = splits[0]; return ([row[:c],row[c:]] for row in it) # unpretty a,*r,b = splits; s = splits | cli.window(2) | ~cli.apply(lambda x,y: f"x[{x}:{y}], ") | cli.join("") # unpretty f = eval(f"lambda x: [x[:{a}], {s}x[{b}:]]"); return (f(row) for row in it) # unpretty
def tab(text, pad=" "): # tab return "\n".join([pad + line for line in text.split("\n")]) # tab
[docs]class intercept(BaseCli): # intercept
[docs] def __init__(self, f=None, raiseError:bool=True, delay=0): # intercept """Intercept flow at a particular point, analyze the object piped in using the specified function "f", and raises error to stop flow. Example:: 3 | intercept() This is useful to diagnose what happens inside a mess of clis:: ... | apply(A | B | intercept()) | ... # intercepts flow by throwing an error. Prints shape of element ... | apply(A | B | intercept(deref())) | ... # intercepts flow by throwing an error. Prints actual element ... | apply(A | B | intercept(deref() | aS(pprint.pformat))) | ... # intercepts flow by throwing an error. Prints actual element, but pretty-formatted ... | apply(A | B | intercept(deref(), delay=3)) | ... # intercepts flow by throwing an error. Prints actual element after intercept().__ror__ has been called 3 times :param f: prints out the object transformed by this function. By default it's :class:`~k1lib.cli.utils.shape` :param raiseError: whether to raise error when executed or not :param delay: won't do anything after this has been called this number of times""" # intercept self.f = f or cli.shape(); self.raiseError = raiseError; self.delay = delay; self.count = 0 # intercept
[docs] def __ror__(self, s): # intercept self.count += 1 # intercept if self.count > self.delay: # intercept print(type(s)); print(self.f(s)) # intercept if self.raiseError: raise RuntimeError("intercepted") # intercept return s # intercept
[docs]class plotImgs(BaseCli): # plotImgs
[docs] def __init__(self, col=5, aspect=1, fac=2, axis=False, table=False, im=False): # plotImgs """Plots a bunch of images at the same time in a table. Example:: # plots all images [torch.randn(10, 20), torch.randn(20, 10)] | plotImgs() # plots all images with titles [[torch.randn(10, 20), "img 1"], [torch.randn(20, 10), "img 2"]] | plotImgs() If you have multiple rows with different number of images, you can plot that with this too, just set ``table=True`` like this:: [[torch.randn(10, 20), torch.randn(20, 10)], [torch.randn(10, 20)]] | plotImgs(table=True) There's another cli that kinda does what this does: :class:`~k1lib.cli.utils.sketch`. You have more control over there, and it does roughly what this cli does, but the typical usage is different. This is more for plotting static, throwaway list of 2d arrays, like training set images, where as :class:`~k1lib.cli.utils.sketch` is more about plotting results of detailed analyses. :param col: number of columns in the table. If explicitly None, it will turn into the number of images fed. Not available if ``table=True`` :param aspect: aspect ratio of each images, or ratio between width and height :param fac: figsize factor. The higher, the more resolution :param axis: whether to display the axis or not :param table: whether to plot using table mode :param im: if True, returns an image""" # plotImgs self.col = col; self.fac = fac; self.axis = axis; self.aspect = aspect; self.table = table; self.im = im # plotImgs
[docs] def __ror__(self, imgs): # plotImgs imgs = imgs | cli.deref(); col = self.col; fac = self.fac; aspect = self.aspect**0.5 # plotImgs if not self.table: # main code # plotImgs if len(imgs) == 0: return # plotImgs if col is None or col > len(imgs): col = len(imgs) # plotImgs n = math.ceil(len(imgs)/col) # plotImgs fig, axes = plt.subplots(n, col, figsize=(col*fac*aspect, n*fac/aspect)); # plotImgs axes = axes.flatten() if isinstance(axes, np.ndarray) else [axes] # plotImgs for ax, im in zip(axes, imgs): # plotImgs plt.sca(ax) # plotImgs if isinstance(im, (list, tuple)): plt.imshow(im[0]); plt.title(im[1]) # plotImgs else: plt.imshow(im) # plotImgs if not self.axis: ax.axis("off") # plotImgs for i in range(len(imgs), len(axes)): axes[i].remove() # removing leftover axes # plotImgs else: # plotImgs if col != 5: raise Exception("Currently in table mode, can't set `col` parameter") # change this value to match col's default value # plotImgs h = imgs | cli.shape(0); w = imgs | cli.shape(0).all() | cli.toMax() # plotImgs fig, axes = plt.subplots(h, w, figsize=(w*fac*aspect, h*fac/aspect)); # plotImgs for rAx, rIm in zip(axes, imgs): # plotImgs for cAx, cIm in zip(rAx, rIm): # plotImgs plt.sca(cAx) # plotImgs if isinstance(cIm, (list, tuple)): plt.imshow(cIm[0]); plt.title(cIm[1]) # plotImgs else: plt.imshow(cIm) # plotImgs if not self.axis: cAx.axis("off") # plotImgs for i in range(len(rIm), len(rAx)): rAx[i].remove() # removing leftover axes # plotImgs plt.tight_layout() # plotImgs if self.im: return plt.gcf() | cli.toImg() # plotImgs