# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for all short utilities that converts from 1 data type to another. They
might feel they have different styles, as :class:`toFloat` converts object iterator to
float iterator, while :class:`toImg` converts single image url to single PIL image,
whereas :class:`toSum` converts float iterator into a single float value.
The general convention is, if the intended operation sounds simple (convert to floats,
strings, types, ...), then most likely it will convert iterator to iterator, as you
can always use the function directly if you only want to apply it on 1 object.
If it sounds complicated (convert to PIL image, tensor, ...) then most likely it will
convert object to object. Lastly, there are some that just feels right to input
an iterator and output a single object (like getting max, min, std, mean values)."""
__all__ = ["toNdArray", "toTensor", "toRange", "toList",
"toSum", "toProd", "toAvg", "toMean", "toStd", "toMedian", "toMax", "toMin", "toArgmin", "toArgmax",
"toImg", "toRgb", "toRgba", "toGray", "toDict",
"toFloat", "toInt", "toBytes", "toDataUri", "toAnchor", "toHtml",
"toAscii", "toHash", "toCsv", "toYaml", "Audio", "toAudio", "toUnix", "toIso", "toYMD", "toLinks",
"toMovingAvg", "toCm", "Pdf", "toPdf", "toDist", "toAngle", "idxsToNdArray", "toFileType"]
import re, k1lib, math, os, numpy as np, io, json, base64, unicodedata, inspect, time
from k1lib.cli.init import BaseCli, T, yieldT; import k1lib.cli as cli, k1lib.cli.init as init
from k1lib.cli.typehint import *; mpl = k1lib.dep.mpl; plt = k1lib.dep.plt; yaml = k1lib.dep.yaml; pd = k1lib.dep.pd; cm = k1lib.dep.cm
from collections import deque, defaultdict; from typing import Iterator, Any, List, Set, Tuple, Dict, Callable, Union
settings = k1lib.settings.cli; imgkit = k1lib.dep("imgkit", url="https://github.com/csquared/IMGKit")
try: import PIL; import PIL.Image; hasPIL = True
except: hasPIL = False
try: import torch; hasTorch = True
except: torch = k1lib.dep.torch; hasTorch = False
try: import rdkit; hasRdkit = True
except: hasRdkit = False
try: import graphviz; hasGraphviz = True
except: hasGraphviz = False
try: import plotly; import plotly.express as px; hasPlotly = True
except: hasPlotly = False
try: import pandas as pd; pd.core; hasPandas = True
except: hasPandas = False
[docs]class toNdArray(BaseCli): # toNdArray
blurb="Converts several data types to numpy.ndarray" # toNdArray
[docs] def __init__(self, dtype=None): # toNdArray
"""Converts generator/several data types to :class:`numpy.ndarray`. Essentially
``np.array(list(it))``. Can convert PIL Image. Example::
# returns array([0., 1., 2.])
range(3) | toNdArray()
# returns array([0., 1., 2.], dtype=float32)
range(3) | toNdArray(np.float32)
""" # toNdArray
self.dtype = dtype # toNdArray
def _all_array_opt(self, it, level): return it.detach().cpu().numpy().asdtype(self.dtype) if hasTorch and isinstance(it, torch.Tensor) else it # toNdArray
[docs] def __ror__(self, it): # toNdArray
if hasattr(it, "_toNdArray"): # toNdArray
args = inspect.getfullargspec(it._toNdArray).args[1:]; n = len(args) # toNdArray
s = set(["dtype"]); weirdArgs = [a for a in args if a not in s] # toNdArray
if len(weirdArgs) > 0: raise Exception(f"Custom datatype `{type(it)}` has ._toNdArray() method, which expects only `dtype` arguments, but detected these arguments instead: {weirdArgs}. Please fix `{type(it)}`") # toNdArray
return it._toNdArray() if n == 0 else it._toNdArray(self.dtype) # toNdArray
if hasPIL and isinstance(it, PIL.Image.Image): # toNdArray
mode_to_nptype = {'I': np.int32, 'I;16': np.int16, 'F': np.float32} # toNdArray
img = np.array(it, mode_to_nptype.get(it.mode, np.uint8), copy=True) # toNdArray
if it.mode == '1': img = 255 * img # toNdArray
img = img.reshape((it.size[1], it.size[0], len(it.getbands()))) # toNdArray
it = img.transpose((2, 0, 1)) # toNdArray
if hasPandas and isinstance(it, pd.core.arraylike.OpsMixin): it = it.to_numpy() # toNdArray
if hasTorch and isinstance(it, torch.Tensor): it = it.numpy() # toNdArray
if not isinstance(it, np.ndarray): it = np.array(list(it)) # toNdArray
return it.astype(self.dtype) if self.dtype else it # toNdArray
[docs]class toTensor(BaseCli): # toTensor
blurb="Converts several data types to torch.Tensor" # toTensor
[docs] def __init__(self, dtype=None): # toTensor
"""Converts generator to :class:`torch.Tensor`. Essentially
``torch.tensor(list(it))``. Default dtype is float32. Can convert PIL Image. Example::
# returns tensor([0., 1., 2.], dtype=torch.float64)
range(3) | toTensor(torch.float64)
""" # toTensor
self.dtype = dtype or torch.float32 # toTensor
def _all_array_opt(self, it, level): return torch.tensor(it, dtype=self.dtype) # toTensor
[docs] def __ror__(self, it:Iterator[float]) -> "torch.Tensor": # toTensor
if hasattr(it, "_toTensor"): # toTensor
args = inspect.getfullargspec(it._toTensor).args[1:]; n = len(args) # toTensor
s = set(["dtype"]); weirdArgs = [a for a in args if a not in s] # toTensor
if len(weirdArgs) > 0: raise Exception(f"Custom datatype `{type(it)}` has ._toTensor() method, which expects only `dtype` arguments, but detected these arguments instead: {weirdArgs}. Please fix `{type(it)}`") # toTensor
return it._toTensor() if n == 0 else it._toTensor(self.dtype) # toTensor
if not isinstance(it, torch.Tensor): it = torch.from_numpy(it | toNdArray()) # toTensor
return it.to(self.dtype) # toTensor
[docs]class toList(BaseCli): # this still exists cause some LLVM optimizations are done on this, and too tired to change that at the moment # toList
[docs] def __init__(self): # toList
"""Converts generator to list.
Example::
# returns [0, 1, 2, 3, 4]
range(5) | toList()
# returns [0, 1, 2, 3, 4]
range(5) | aS(list)
So this cli is sort of outdated. It still works fine, nothing wrong
with it, but just do ``aS(list)`` instead. It's not removed to
avoid breaking old projects.""" # toList
super().__init__() # toList
def _all_array_opt(self, it, level): return it # toList
def _typehint(self, inp): # toList
if isinstance(inp, tListIterSet): return tList(inp.child) # toList
if isinstance(inp, tCollection): return inp # toList
return tList(tAny()) # toList
[docs] def __ror__(self, it:Iterator[Any]) -> List[Any]: return list(init.dfGuard(it)) # toList
def _jsF(self, meta): # toList
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toList
return f"{fIdx} = ({dataIdx}) => {dataIdx}", fIdx # toList
def _toRange(it): # _toRange
for i, _ in enumerate(it): yield i # _toRange
[docs]class toRange(BaseCli): # toRange
blurb="Returns iter(range(len(it))), but incrementally" # toRange
[docs] def __init__(self): # toRange
"""Returns iter(range(len(it))), effectively.
Example::
# returns [0, 1, 2]
[3, 2, 5] | toRange() | deref()""" # toRange
super().__init__() # toRange
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[int]: # toRange
try: return range(len(it)) # toRange
except: return _toRange(it) # toRange
def _jsF(self, meta): # toRange
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toRange
return f"{fIdx} = ({dataIdx}) => {dataIdx}.toRange()", fIdx # toRange
tOpt.addPass(lambda cs, ts, _: [cs[0]], [toRange, toRange]) # toRange
settings.add("arrayTypes", (torch.Tensor, np.ndarray) if hasTorch else (np.ndarray,), "default array types used to accelerate clis") # toRange
def genericTypeHint(inp): # genericTypeHint
if isinstance(inp, tListIterSet): return inp.child # genericTypeHint
if isinstance(inp, tCollection): return inp.children[0] # genericTypeHint
if isinstance(inp, tArrayTypes): return inp.child # genericTypeHint
return tAny() # genericTypeHint
[docs]class toSum(BaseCli): # toSum
blurb="Calculates the sum of a list of numbers" # toSum
[docs] def __init__(self): # toSum
"""Calculates the sum of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
range(10) | toSum() # returns 45
np.random.randn(2, 3, 4) | toSum().all() | shape() # returns (2,)
""" # toSum
super().__init__() # toSum
def _all_array_opt(self, it, level): # toSum
bm = np if isinstance(it, np.ndarray) else (torch if hasTorch and isinstance(it, torch.Tensor) else None) # toSum
return NotImplemented if bm is None else bm.sum(it, tuple(range(level, len(it.shape)))) # toSum
def _typehint(self, inp): return genericTypeHint(inp) # toSum
[docs] def __ror__(self, it:Iterator[float]): # toSum
if isinstance(it, settings.arrayTypes) or (hasPandas and isinstance(it, pd.Series)): return it.sum() # toSum
return sum(init.dfGuard(it)) # toSum
def _jsF(self, meta): # toSum
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toSum
return f"{fIdx} = ({dataIdx}) => {dataIdx}.toSum()", fIdx # toSum
[docs]class toProd(BaseCli): # toProd
blurb="Calculates the product of a list of numbers" # toProd
[docs] def __init__(self): # toProd
"""Calculates the product of a list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
range(1,10) | toProd() # returns 362880
np.random.randn(2, 3, 4) | toProd().all() | shape() # returns (2,)
""" # toProd
super().__init__() # toProd
def _all_array_opt(self, it, level): # toProd
if isinstance(it, np.ndarray): return np.prod(it, tuple(range(level, len(it.shape)))) # toProd
elif hasTorch and isinstance(it, torch.Tensor): # toProd
for i in range(level, len(it.shape)): it = torch.prod(it, level) # toProd
return it # toProd
return NotImplemented # toProd
def _typehint(self, inp): return genericTypeHint(inp) # toProd
[docs] def __ror__(self, it): # toProd
if isinstance(it, settings.arrayTypes) or (hasPandas and isinstance(it, pd.Series)): return it.prod() # toProd
else: return math.prod(init.dfGuard(it)) # toProd
def _jsF(self, meta): # toProd
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toProd
return f"{fIdx} = ({dataIdx}) => {dataIdx}.toProd()", fIdx # toProd
[docs]class toAvg(BaseCli): # toAvg
blurb="Calculates the average of a list of numbers" # toAvg
[docs] def __init__(self): # toAvg
"""Calculates average of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
range(10) | toAvg() # returns 4.5
[] | toAvg() # returns nan
np.random.randn(2, 3, 4) | toAvg().all() | shape() # returns (2,)
""" # toAvg
super().__init__() # toAvg
def _all_array_opt(self, it, level): # toAvg
bm = np if isinstance(it, np.ndarray) else (torch if hasTorch and isinstance(it, torch.Tensor) else None) # toAvg
return NotImplemented if bm is None else bm.mean(it, tuple(range(level, len(it.shape)))) # toAvg
def _typehint(self, inp): # toAvg
i = None # toAvg
if isinstance(inp, tListIterSet): i = inp.child # toAvg
if isinstance(inp, tCollection): i = inp.children[0] # toAvg
if isinstance(inp, tArrayTypes): i = inp.child # toAvg
if i is not None: return float if i == int else i # toAvg
return tAny() # toAvg
[docs] def __ror__(self, it:Iterator[float]): # toAvg
if isinstance(it, settings.arrayTypes) or (hasPandas and isinstance(it, pd.Series)): return it.mean() # toAvg
s = 0; i = -1 # toAvg
for i, v in enumerate(init.dfGuard(it)): s += v # toAvg
i += 1 # toAvg
if not k1lib.settings.cli.strict and i == 0: return float("nan") # toAvg
return s / i # toAvg
def _jsF(self, meta): # toAvg
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toAvg
return f"{fIdx} = ({dataIdx}) => {dataIdx}.toAvg()", fIdx # toAvg
if hasTorch: # toAvg
torchVer = int(torch.__version__.split(".")[0]) # toAvg
if torchVer >= 2: # toAvg
def torchStd(it, ddof, dim=None): return torch.std(it, dim, correction=ddof) # toAvg
else: # toAvg
def torchStd(it, ddof, dim=None): # toAvg
if ddof == 0: return torch.std(it, dim, unbiased=False) # toAvg
if ddof == 1: return torch.std(it, dim, unbiased=True) # toAvg
raise Exception(f"Please install PyTorch 2, as version 1 don't support correction factor of {ddof}") # toAvg
else: # toAvg
def torchStd(it, ddof): raise Exception("PyTorch not installed") # toAvg
[docs]class toStd(BaseCli): # toStd
blurb="Calculates the standard deviation of a list of numbers" # toStd
[docs] def __init__(self, ddof:int=0): # toStd
"""Calculates standard deviation of list of numbers. Can pipe in :class:`torch.Tensor`
or :class:`numpy.ndarray` to be faster. Example::
range(10) | toStd() # returns 2.8722813232690143
[] | toStd() # returns nan
np.random.randn(2, 3, 4) | toStd().all() | shape() # returns (2,)
:param ddof: "delta degree of freedom". The divisor used in calculations is ``N - ddof``""" # toStd
self.ddof = ddof # toStd
def _all_array_opt(self, it, level): # toStd
n = len(it.shape); ddof = self.ddof; dim = tuple(range(level, n)) # toStd
if isinstance(it, np.ndarray): return np.std(it, ddof=ddof, axis=dim) # toStd
elif hasTorch and isinstance(it, torch.Tensor): return torchStd(it, ddof, dim) # toStd
return NotImplemented # toStd
[docs] def __ror__(self, it): # toStd
ddof = self.ddof # toStd
if hasPandas and isinstance(it, pd.Series): return it.std(ddof=self.ddof) # toStd
if hasPandas and isinstance(it, pd.DataFrame): it = init.dfGuard(it) # toStd
if isinstance(it, settings.arrayTypes): # toStd
if isinstance(it, np.ndarray): return np.std(it, ddof=ddof) # toStd
elif hasTorch and isinstance(it, torch.Tensor): return torchStd(it, ddof) # toStd
return np.std(np.array(list(it)), ddof=ddof) # toStd
def _jsF(self, meta): # toStd
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toStd
return f"{fIdx} = ({dataIdx}) => {dataIdx}.toStd()", fIdx # toStd
toMean = toAvg # toStd
[docs]class toMax(BaseCli): # toMax
blurb="Calculates the max value of a list of numbers" # toMax
[docs] def __init__(self): # toMax
"""Calculates the max of a bunch of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
[2, 5, 6, 1, 2] | toMax() # returns 6
np.random.randn(2, 3, 4) | toMax().all() | shape() # returns (2,)
""" # toMax
super().__init__() # toMax
def _all_array_opt(self, it, level): # toMax
if isinstance(it, np.ndarray): return np.max(it, tuple(range(level, len(it.shape)))) # toMax
elif hasTorch and isinstance(it, torch.Tensor): # toMax
for i in range(level, len(it.shape)): it = torch.max(it, level)[0] # toMax
return it # toMax
return NotImplemented # toMax
[docs] def __ror__(self, it:Iterator[float]) -> float: # toMax
if isinstance(it, settings.arrayTypes) or (hasPandas and isinstance(it, pd.Series)): return it.max() # toMax
return max(it) # toMax
def _jsF(self, meta): # toMax
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toMax
return f"{fIdx} = ({dataIdx}) => {dataIdx}.toMax()", fIdx # toMax
[docs]class toMin(BaseCli): # toMin
blurb="Calculates the min value of a list of numbers" # toMin
[docs] def __init__(self): # toMin
"""Calculates the min of a bunch of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
[2, 5, 6, 1, 2] | toMin() # returns 1
np.random.randn(2, 3, 4) | toMin().all() | shape() # returns (2,)
""" # toMin
super().__init__() # toMin
def _all_array_opt(self, it, level): # toMin
if isinstance(it, np.ndarray): return np.min(it, tuple(range(level, len(it.shape)))) # toMin
elif hasTorch and isinstance(it, torch.Tensor): # toMin
for i in range(level, len(it.shape)): it = torch.min(it, level)[0] # toMin
return it # toMin
return NotImplemented # toMin
[docs] def __ror__(self, it:Iterator[float]) -> float: # toMin
if isinstance(it, settings.arrayTypes) or (hasPandas and isinstance(it, pd.Series)): return it.min() # toMin
return min(it) # toMin
def _jsF(self, meta): # toMin
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toMin
return f"{fIdx} = ({dataIdx}) => {dataIdx}.toMin()", fIdx # toMin
[docs]class toArgmin(BaseCli): # toArgmin
blurb="Grabs the min value's index" # toArgmin
[docs] def __init__(self): # toArgmin
"""Get the input iterator's index of the min value.
Example::
[2, 3, 4, 1, 5] | toArgmin() # returns 3
np.random.randn(3, 4, 5) | toArgmin().all() | shape() # returns (3,)
""" # toArgmin
pass # toArgmin
def _all_array_opt(self, it, level): # toArgmin
n = len(it.shape) # toArgmin
if n < level: raise Exception(f"You're trying to do `np.random.randn({', '.join(it.shape)}) | toArgmin().all({level})` which does not make sense, as the array's dimension is less than the .all() dimension") # toArgmin
it = it | cli.joinSt(n-level-1).all(level); return it.argmin(level) # toArgmin
[docs] def __ror__(self, it): # toArgmin
if isinstance(it, k1lib.settings.cli.arrayTypes): return it.argmin().item() # toArgmin
if hasPandas and isinstance(it, pd.Series): return it.argmin().item() # toArgmin
try: len(it); return np.array(it).argmin().item() # toArgmin
except: return np.array(it | cli.deref()).argmin().item() # toArgmin
[docs]class toArgmax(BaseCli): # toArgmax
blurb="Grabs the max value's index" # toArgmax
[docs] def __init__(self): # toArgmax
"""Get the input iterator's index of the max value.
Example::
[2, 3, 4, 1, 5] | toArgmax() # returns 4
np.random.randn(3, 4, 5) | toArgmax().all() | shape() # returns (3,)
""" # toArgmax
pass # toArgmax
def _all_array_opt(self, it, level): # toArgmax
n = len(it.shape) # toArgmax
if n < level: raise Exception(f"You're trying to do `np.random.randn({', '.join(it.shape)}) | toArgmax().all({level})` which does not make sense, as the array's dimension is less than the .all() dimension") # toArgmax
it = it | cli.joinSt(n-level-1).all(level); return it.argmax(level) # toArgmax
[docs] def __ror__(self, it): # toArgmax
if isinstance(it, k1lib.settings.cli.arrayTypes): return it.argmax().item() # toArgmax
if hasPandas and isinstance(it, pd.Series): return it.argmax().item() # toArgmax
try: len(it); return np.array(it).argmax().item() # toArgmax
except: return np.array(it | cli.deref()).argmax().item() # toArgmax
settings.add("font", None, "default font file. Best to use .ttf files, used by toImg()") # toArgmax
settings.add("chem", k1lib.Settings().add("imgSize", 200, "default image size used in toImg() when drawing rdkit molecules"), "chemistry-related settings") # toArgmax
class Svg(str): # Svg
def _toImg(self, **kwargs): # Svg
import tempfile; a = tempfile.NamedTemporaryFile() # Svg
import cairosvg; cairosvg.svg2png(bytestring=f"{self}",write_to=a.name); im = a.name | toImg() # Svg
return im # Svg
def _repr_html_(self): return self # Svg
def cropToContentNp(ogIm, pad=10): # cropToContentNp
dim = len(ogIm.shape); im = ogIm # cropToContentNp
if dim > 2: im = im.mean(0) # cropToContentNp
coords = np.argwhere(im.max()-im); x_min, y_min = coords.min(axis=0); x_max, y_max = coords.max(axis=0) # cropToContentNp
return ogIm[x_min-pad:x_max+1+pad, y_min-pad:y_max+1+pad] if dim == 2 else ogIm[:,x_min-pad:x_max+1+pad, y_min-pad:y_max+1+pad] # cropToContentNp
def cropToContentPIL(im, pad=0): # cropToContentPIL
im = im | toTensor(int) | cli.op().numpy() | cli.aS(cropToContentNp, pad) # cropToContentPIL
return torch.from_numpy(im).permute(1, 2, 0) | toImg() if len(im.shape) > 2 else im | toImg() # cropToContentPIL
[docs]class toImg(BaseCli): # toImg
blurb="Converts multiple data types into a PIL image" # toImg
[docs] def __init__(self, closeFig=True, crop=True): # toImg
"""Converts multiple data types into a PIL image.
Example::
ls(".") | toImg().all() | item() # grabs first image in the current folder
torch.randn(100, 200) | toImg() # converts from tensor/array to image
"abc.jpg" | toImg() | toBytes() | toImg() # grabs image, converts to byte stream, and converts back to image
["abc", "def"] | toImg() # converts paragraphs to image
"c1ccc(C)cc1" | toMol() | toImg() # converts SMILES string to molecule, then to image
["ab", "bc", "ca"] | (kgv.sketch() | kgv.edges()) | toHtml() | toImg() # sketches a graphviz plot, converts to svg then renders the svg as an image
df | toHtml() | toImg() # converts pandas data frame to html, then render it to image
You can also save a matplotlib figure by piping in a :class:`matplotlib.figure.Figure` object::
x = np.linspace(0, 4)
plt.plot(x, x**2)
plt.gcf() | toImg()
.. note::
If you are working with image tensors, which is typically have
dimensions of (C, H, W), you have to permute it to PIL's (H, W, C)
first before passing it into this cli.
Also it's expected that
your tensor image ranges from 0-255, and not 0-1. Make sure you
renormalize it
:param closeFig: if input is a matplotlib figure, then closes the figure after generating the image
:param crop: whether to crop white spaces around an image or not""" # toImg
import PIL; self.PIL = PIL; self.closeFig = closeFig; self.crop = crop # toImg
def _typehint(self, inp): # toImg
return PIL.Image.Image # toImg
[docs] def __ror__(self, path) -> "PIL.Image.Image": # toImg
if hasattr(path, "_toImg"): return path._toImg(closeFig=self.closeFig, crop=self.crop) # toImg
if isinstance(path, str): return self.PIL.Image.open(os.path.expanduser(path)) # toImg
if isinstance(path, bytes): return self.PIL.Image.open(io.BytesIO(path)) # toImg
if hasTorch and isinstance(path, torch.Tensor): path = path.numpy() # toImg
if isinstance(path, np.ndarray): # toImg
return self.PIL.Image.fromarray(path.astype("uint8")) # toImg
if isinstance(path, mpl.figure.Figure): # toImg
canvas = path.canvas; canvas.draw() # toImg
img = self.PIL.Image.frombytes('RGB', canvas.get_width_height(), canvas.tostring_rgb()) # toImg
if self.closeFig: plt.close(path) # toImg
return img | cli.aS(cropToContentPIL) # toImg
if hasGraphviz and isinstance(path, graphviz.Digraph): # toImg
import tempfile; a = tempfile.NamedTemporaryFile() # toImg
path.render(a.name, format="jpeg"); # toImg
fn = f"{a.name}.jpeg"; im = fn | toImg() # toImg
try: os.remove(fn) # toImg
except: pass # toImg
return im # toImg
if hasRdkit and isinstance(path, rdkit.Chem.rdchem.Mol): # toImg
sz = settings.chem.imgSize # toImg
return self.__ror__(rdkit.Chem.Draw.MolsToGridImage([path], subImgSize=[sz, sz]).data) | (cli.aS(cropToContentPIL) if self.crop else cli.iden()) # toImg
if hasPandas and isinstance(path, pd.DataFrame): path = path | cli.toHtml() # toImg
if isinstance(path, k1lib.viz.Html): return imgkit.from_string(path, False, options={'format': 'jpg'}) | toImg() # toImg
path = path | cli.deref() # toImg
if len(path) > 0 and isinstance(path[0], str): # toImg
from PIL import ImageDraw # toImg
h = path | cli.shape(0); w = path | cli.shape(0).all() | cli.aS(max) # toImg
image = self.PIL.Image.new("L", ((w+1)*20, (h+1)*60), 255) # toImg
font = PIL.ImageFont.truetype(settings.font, 18) if settings.font else None # toImg
ImageDraw.Draw(image).text((20, 20), path | cli.join("\n"), 0, font=font) # toImg
return np.array(image)/255 | (cli.aS(cropToContentNp) if self.crop else iden()) | cli.op()*255 | toImg() # toImg
return NotImplemented # toImg
_nonNpImgTypes = [np.ndarray] # toImg
if hasTorch: _nonNpImgTypes.append(torch.Tensor) # toImg
if hasPIL: _nonNpImgTypes.append(PIL.Image.Image) # toImg
_nonNpImgTypes = tuple(_nonNpImgTypes) # toImg
class toNpImg(BaseCli): # toNpImg
def __init__(self): # toNpImg
"""Converts to a numpy array containing the image data""" # toNpImg
pass # toNpImg
def __ror__(self, it): # toNpImg
if hasattr(it, "_toNpImg"): return it._toNpImg() # toNpImg
if not isinstance(it, _nonNpImgTypes): it = it | toImg() # toNpImg
if hasPIL and isinstance(it, PIL.Image.Image): it = it | toTensor() # toNpImg
if hasTorch and isinstance(it, torch.Tensor): it = it.numpy() # toNpImg
if isinstance(it, np.ndarray): it = it.astype(np.uint8) # toNpImg
return it # toNpImg
[docs]class toRgb(BaseCli): # toRgb
blurb="Converts grayscale/rgb PIL image to rgb image" # toRgb
[docs] def __init__(self): # toRgb
"""Converts greyscale/rgb PIL image to rgb image.
Example::
# reads image file and converts it to rgb
"a.png" | toImg() | toRgb()""" # toRgb
import PIL; self.PIL = PIL # toRgb
def _typehint(self, inp): return inp # toRgb
[docs] def __ror__(self, i): # toRgb
if hasattr(i, "_toRgb"): return i._toRgb() # toRgb
if i.getbands() == ("R", "G", "B"): return i # toRgb
rgbI = self.PIL.Image.new("RGB", i.size) # toRgb
rgbI.paste(i); return rgbI # toRgb
[docs]class toRgba(BaseCli): # toRgba
blurb="Converts random PIL image to rgba image" # toRgba
[docs] def __init__(self): # toRgba
"""Converts random PIL image to rgba image.
Example::
# reads image file and converts it to rgba
"a.png" | toImg() | toRgba()""" # toRgba
import PIL; self.PIL = PIL # toRgba
def _typehint(self, inp): return inp # toRgba
[docs] def __ror__(self, i): # toRgba
if hasattr(i, "_toRgba"): return i._toRgba() # toRgba
if i.getbands() == ("R", "G", "B", "A"): return i # toRgba
rgbI = self.PIL.Image.new("RGBA", i.size) # toRgba
rgbI.paste(i); return rgbI # toRgba
[docs]class toGray(BaseCli): # toGray
blurb="Converts random PIL image to a grayscale image" # toGray
[docs] def __init__(self): # toGray
"""Converts random PIL image to a grayscale image.
Example::
# reads image file and converts it to rgba
"a.png" | toImg() | toGray()""" # toGray
import PIL; self.PIL = PIL # toGray
def _typehint(self, inp): return inp # toGray
[docs] def __ror__(self, i): # toGray
if hasattr(i, "_toGray"): return i._toGray() # toGray
if i.getbands() == ("L"): return i # toGray
return self.PIL.ImageOps.grayscale(i) # toGray
[docs]class toDict(BaseCli): # toDict
blurb="Converts 2 Iterators, 1 key, 1 value into a dictionary" # toDict
[docs] def __init__(self, rows=True, f=None): # toDict
"""Converts 2 Iterators, 1 key, 1 value into a dictionary.
Example::
# returns {1: 3, 2: 4}
[[1, 3], [2, 4]] | toDict()
# returns {1: 3, 2: 4}
[[1, 2], [3, 4]] | toDict(False)
If ``rows`` is a string, then it will build a dictionary from key-value
pairs delimited by this character. For example::
['gene_id "ENSG00000290825.1"',
'transcript_id "ENST00000456328.2"',
'gene_type "lncRNA"',
'gene_name "DDX11L2"',
'transcript_type "lncRNA"',
'transcript_name "DDX11L2-202"',
'level 2',
'transcript_support_level "1"',
'tag "basic"',
'tag "Ensembl_canonical"',
'havana_transcript "OTTHUMT00000362751.1"'] | toDict(" ")
That returns::
{'gene_id': '"ENSG00000290825.1"',
'transcript_id': '"ENST00000456328.2"',
'gene_type': '"lncRNA"',
'gene_name': '"DDX11L2"',
'transcript_type': '"lncRNA"',
'transcript_name': '"DDX11L2-202"',
'level': '2',
'transcript_support_level': '"1"',
'tag': '"Ensembl_canonical"',
'havana_transcript': '"OTTHUMT00000362751.1"'}
:param rows: if True, reads input in row by row, else reads
in list of columns
:param f: if specified, return a defaultdict that uses this function as its generator""" # toDict
self.rows = rows # toDict
if f is not None: self.f = lambda d: defaultdict(f, d) # toDict
else: self.f = lambda x: x # toDict
[docs] def __ror__(self, it) -> dict: # toDict
r = self.rows; f = self.f # toDict
if r: # toDict
if isinstance(r, str): return it | cli.apply(cli.aS(lambda x: x.split(" ")) | cli.head(1).split() | cli.item() + cli.join(" ")) | toDict() # toDict
return f({_k:_v for _k, _v in it}) # toDict
return f({_k:_v for _k, _v in zip(*it)}) # toDict
def _jsF(self, meta): # toDict
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toDict
if not self.rows: raise Exception("toDict._jsF() doesn't support .rows=False yet") # toDict
return f"{fIdx} = ({dataIdx}) => {dataIdx}.toDict()", fIdx # toDict
def _toop(toOp, c, force, defaultValue): # _toop
return cli.apply(toOp, c) | (cli.apply(lambda x: x or defaultValue, c) if force else cli.filt(cli.op() != None, c)) # _toop
def _toFloat(e) -> Union[float, None]: # _toFloat
try: return float(e) # _toFloat
except: return None # _toFloat
[docs]class toFloat(BaseCli): # toFloat
blurb="Converts an iterator into a list of floats" # toFloat
[docs] def __init__(self, *columns, mode=2): # toFloat
"""Converts every row into a float. Example::
# returns [1, 3, -2.3]
["1", "3", "-2.3"] | toFloat() | deref()
# returns [[1.0, 'a'], [2.3, 'b'], [8.0, 'c']]
[["1", "a"], ["2.3", "b"], [8, "c"]] | toFloat(0) | deref()
With weird rows::
# returns [[1.0, 'a'], [8.0, 'c']]
[["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0) | deref()
# returns [[1.0, 'a'], [0.0, 'b'], [8.0, 'c']]
[["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0, force=True) | deref()
This also works well with :class:`torch.Tensor` and :class:`numpy.ndarray`,
as they will not be broken up into an iterator::
# returns a numpy array, instead of an iterator
np.array(range(10)) | toFloat()
:param columns: if nothing, then will convert each row. If available, then
convert all the specified columns
:param mode: different conversion styles
- 0: simple ``float()`` function, fastest, but will throw errors if it can't be parsed
- 1: if there are errors, then replace it with zero
- 2: if there are errors, then eliminate the row""" # toFloat
self.columns = columns; self.mode = mode # toFloat
[docs] def __ror__(self, it): # toFloat
columns = self.columns; mode = self.mode # toFloat
if len(columns) == 0: # toFloat
if hasPandas and isinstance(it, pd.core.arraylike.OpsMixin): it = it.to_numpy() # toFloat
if isinstance(it, np.ndarray): return it.astype(float) # toFloat
if isinstance(it, torch.Tensor): return it.float() # toFloat
if mode == 0: return (float(e) for e in it) # toFloat
return it | _toop(_toFloat, None, mode == 1, 0.0) # toFloat
else: # toFloat
if hasPandas and isinstance(it, pd.DataFrame): # toFloat
cols = [it[c] for c in list(it)]; nameGen = it.newColName(None) # toFloat
for c in columns: cols[c] = cols[c].copy().astype(float) # toFloat
return pd.DataFrame({getattr(c, "name", next(nameGen)):c for c in cols}) # toFloat
return it | cli.init.serial(*(_toop(_toFloat, c, mode == 1, 0.0) for c in columns)) # toFloat
def _jsF(self, meta): # toFloat
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); cols = self.columns # toFloat
if len(cols) == 0: # toFloat
if mode == 0: return f"{fIdx} = ({dataIdx}) => {dataIdx}.map((v) => parseFloat(v))", fIdx # toFloat
if mode == 1: return f"{fIdx} = ({dataIdx}) => {dataIdx}.map((v) => {{ const a = parseFloat(v); return a === a ? a : 0 }})", fIdx # toFloat
if mode == 2: return f"{fIdx} = ({dataIdx}) => {{ const ans = []; for (const v of {dataIdx}) {{ const a = parseFloat(v); if (a === a) ans.push(a); }}; return ans; }}" # toFloat
else: return f"""\
{fIdx} = ({dataIdx}) => {{
const ans = [];
for (const row of {dataIdx}) {{
{'ans.push(row.map(parseFloat));' if mode == 0 else ''}
{'ans.push(row.map(parseFloat).map((v) => (v === v ? v : 0)));' if mode == 1 else ''}
{'const rowp = row.map(parseFloat);if (rowp.map((v) => v === v).every((v) => v)) ans.push(rowp);' if mode == 2 else ''}
}}
return ans;
}}""", fIdx # toFloat
def _toInt(e) -> Union[int, None]: # _toInt
try: return int(float(e)) # _toInt
except: return None # _toInt
[docs]class toInt(BaseCli): # toInt
blurb="Converts an iterator into a list of ints" # toInt
[docs] def __init__(self, *columns, mode=2): # toInt
"""Converts every row into an integer. Example::
# returns [1, 3, -2]
["1", "3", "-2.3"] | toInt() | deref()
:param columns: if nothing, then will convert each row. If available, then
convert all the specified columns
:param mode: different conversion styles
- 0: simple ``float()`` function, fastest, but will throw errors if it can't be parsed
- 1: if there are errors, then replace it with zero
- 2: if there are errors, then eliminate the row
See also: :meth:`toFloat`""" # toInt
self.columns = columns; self.mode = mode; # toInt
[docs] def __ror__(self, it): # toInt
columns = self.columns; mode = self.mode # toInt
if len(columns) == 0: # toInt
if isinstance(it, np.ndarray): return it.astype(int) # toInt
if isinstance(it, torch.Tensor): return it.int() # toInt
if mode == 0: return (int(e) for e in it) # toInt
return it | _toop(_toInt, None, mode == 1, 0) # toInt
else: # toInt
if hasPandas and isinstance(it, pd.DataFrame): # toInt
cols = [it[c] for c in list(it)]; nameGen = it.newColName(None) # toInt
for c in columns: cols[c] = cols[c].copy().astype(int) # toInt
return pd.DataFrame({getattr(c, "name", next(nameGen)):c for c in cols}) # toInt
return it | cli.init.serial(*(_toop(_toInt, c, mode == 1, 0.0) for c in columns)) # toInt
def _jsF(self, meta): # toInt
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); cols = self.columns # toInt
if len(cols) == 0: # toInt
if mode == 0: return f"{fIdx} = ({dataIdx}) => {dataIdx}.map((v) => parseInt(v))", fIdx # toInt
if mode == 1: return f"{fIdx} = ({dataIdx}) => {dataIdx}.map((v) => {{ const a = parseInt(v); return a === a ? a : 0 }})", fIdx # toInt
if mode == 2: return f"{fIdx} = ({dataIdx}) => {{ const ans = []; for (const v of {dataIdx}) {{ const a = parseInt(v); if (a === a) ans.push(a); }}; return ans; }}" # toInt
else: return f"""\
{fIdx} = ({dataIdx}) => {{
const ans = [];
for (const row of {dataIdx}) {{
{'ans.push(row.map(parseInt));' if mode == 0 else ''}
{'ans.push(row.map(parseInt).map((v) => (v === v ? v : 0)));' if mode == 1 else ''}
{'const rowp = row.map(parseInt);if (rowp.map((v) => v === v).every((v) => v)) ans.push(rowp);' if mode == 2 else ''}
}}
return ans;
}}""", fIdx # toInt
[docs]class toBytes(BaseCli): # toBytes
blurb="Converts several object types to bytes" # toBytes
[docs] def __init__(self, dataType=None): # toBytes
"""Converts several object types to bytes.
Example::
# converts string to bytes
"abc" | toBytes()
# converts image to bytes in jpg format
torch.randn(200, 100) | toImg() | toBytes()
# converts image to bytes in png format
torch.randn(200, 100) | toImg() | toBytes("PNG")
"some_file.mp3" | toAudio() | toBytes("mp3")
If it doesn't know how to convert to bytes, it will just pickle it
.. admonition:: Custom datatype
It is possible to build objects that can interoperate with this cli,
like this::
class custom1:
def __init__(self, config=None): ...
def _toBytes(self): return b"abc"
class custom2:
def __init__(self, config=None): ...
def _toBytes(self, dataType):
if dataType == "png": return b"123"
else: return b"456"
custom1() | toBytes() # returns b"abc"
custom2() | toBytes() # returns b"456"
custom2() | toBytes("png") # returns b"123"
When called upon, :class:`toBytes` will detect that the input has the ``_toBytes``
method, which will prompt it to execute that method of the complex object. Of
course, this means that you can return anything, not necessarily bytes, but to
maintain intuitiveness, you should return either bytes or iterator of bytes
:param dataType: depending on input. If it's an image then this can be png, jpg. If
it's a sound then this can be mp3, wav or things like that""" # toBytes
self.dataType = dataType # toBytes
[docs] def __ror__(self, it): # toBytes
if isinstance(it, str): return it.encode() # toBytes
if hasPIL: # toBytes
if isinstance(it, PIL.Image.Image): # toBytes
it = it | toRgb(); buffered = io.BytesIO() # toBytes
it.save(buffered, format=(self.dataType or "JPEG")); return buffered.getvalue() # toBytes
if hasattr(it, "_toBytes"): # toBytes
n = len(inspect.getfullargspec(it._toBytes).args[1:]) # toBytes
if n == 0: return it._toBytes() # toBytes
elif n == 1: return it._toBytes(self.dataType) # toBytes
else: raise Exception(f"{it.__class__.__name__} have 2 or more arguments, which is unsupported") # toBytes
import dill; return dill.dumps(it) # toBytes
mpld3 = k1lib.dep("mpld3", url="https://mpld3.github.io/") # toBytes
class DataUri: # DataUri
def __init__(self, uri:str): # DataUri
self.uri = uri # "data:image/png;base64, ..." # DataUri
self.mime = uri.split(";")[0].split(":")[-1] # "image/png" # DataUri
self.mimeBase = self.mime.split("/")[0] # "image" # DataUri
def _repr_html_(self): # DataUri
if self.mimeBase == "image": return f"<img src=\"{self.uri}\"/>" # DataUri
if self.mime == "text/html": return base64.b64decode(self.uri.split("base64,")[-1]).decode() # DataUri
def __repr__(self): # DataUri
uri = self.uri # DataUri
return f"<DataUri mime='{self.mime}', self.uri='{(uri[:75] + '...') if len(uri) > 75 else uri}'>" # DataUri
def _dataUriHtml(it): return DataUri(f"data:text/html;base64, {base64.b64encode(it.encode()).decode()}") # _dataUriHtml
[docs]class toDataUri(BaseCli): # toDataUri
blurb="Converts several object types into data uri scheme" # toDataUri
[docs] def __init__(self): # toDataUri
"""Converts incoming object into data uri scheme.
Data uris are the things that look like "data:image/png;base64, ...",
or "data:text/html;base64, ...". This is a convenience tool mainly
for other tools, and not quite useful directly. Example::
randomImg = cat("https://mlexps.com/ergun.png", False) | toImg() # returns PIL image
randomImg | toDataUri() # returns k1lib.cli.conv.DataUri object with .mime field "image/png" and .uri field "data:image/png;base64, ..."
randomImg | toDataUri() | toHtml() # returns hmtl string `<img src="data:image/png;base64, ..."/>`
randomImg | toHtml() # same like above. toHtml() actually calls toDataUri() behind the scenes
randomImg | toDataUri() | toAnchor() # creates anchor tag (aka link elements "<a></a>") that, when clicked, displays the image in a new tab
randomImg | toAnchor() # same as above. toAnchor() actually calls toDataUri() behind the scenes
""" # toDataUri
self.throw = False # can be configured by outside clis, like toHtml() # toDataUri
[docs] def __ror__(self, it): # toDataUri
if isinstance(it, str): return _dataUriHtml(it) # toDataUri
if isinstance(it, DataUri): return it # toDataUri
if hasPIL and isinstance(it, PIL.Image.Image): # toDataUri
it = it | toBytes(dataType="PNG") | cli.aS(base64.b64encode) | cli.op().decode() # toDataUri
return DataUri(f"data:image/png;base64, {it}") # toDataUri
try: return DataUri(it._toDataUri()) # toDataUri
except Exception as e: # toDataUri
if self.throw: raise Exception(f"toDataUri() called on an unfamiliar object, and the object doesn't implement _toDataUri(). Error: {e}") # toDataUri
return _dataUriHtml(it | toHtml()) # toDataUri
[docs]class toAnchor(BaseCli): # toAnchor
blurb="Converts several object types into a html anchor tag" # toAnchor
[docs] def __init__(self, text:str="click here"): # toAnchor
"""Converts incoming object into a html anchor tag that, when clicked,
displays the incoming object's html in another tab. Example::
randomImg = cat("https://mlexps.com/ergun.png", False) | toImg() # returns PIL image
randomImg | toAnchor() # returns html string `<a href="data:image/png;base64, ..."></a>`
On some browsers, there's sort of a weird bug where a new tab would open, but
there's nothing displayed on that tab. If you see this is happening, just press
F5 or Ctrl+R to refresh the page and it should display everything nicely
:param text: text to display inside of the anchor""" # toAnchor
self.text = text # toAnchor
[docs] def __ror__(self, it:str): # toAnchor
s = it | toDataUri() | cli.op().uri # toAnchor
return k1lib.viz.Html(f"<a href=\"{s}\" target=\"_blank\">{self.text}</a>") # toAnchor
[docs]class toHtml(BaseCli): # toHtml
blurb="Converts several object types to html" # toHtml
[docs] def __init__(self): # toHtml
"""Converts several object types to html.
Example::
# converts PIL image to html <img> tag
torch.randn(200, 100) | toImg() | toHtml()
# converts graphviz graph to svg text (which is essentially html)
g = k1.digraph(); g(*"abc"); g(*"bcd"); g | toHtml()
# converts plotly graphs to html
import plotly.express as px; import pandas as pd
df = pd.DataFrame({'x': [1, 2, 3, 4, 5], 'y': [10, 11, 12, 14, 15]})
fig = px.line(df, x='x', y='y', title='Simple Line Chart')
fig | toHtml()
# converts matplotlib plot to image, and then to html. Do this if you want a static plot
x = np.linspace(-2, 2); y = x**2
plt.plot(x, x**2); plt.gcf() | toImg() | toHtml()
# converts matplotlib plot to D3.js html sketch
plt.plot(x, x**2); plt.gcf() | toHtml()
""" # toHtml
pass # toHtml
[docs] def __ror__(self, it): return k1lib.viz.Html(self._ror_(it)) # toHtml
def _ror_(self, it): # toHtml
if isinstance(it, str): return it # toHtml
if hasPlotly and isinstance(it, plotly.graph_objs._figure.Figure): # toHtml
out = io.StringIO(); it.write_html(out); out.seek(0); return out.read() # toHtml
if isinstance(it, mpl.figure.Figure): res = mpld3.fig_to_html(it); plt.close(it); return res # toHtml
if hasGraphviz and isinstance(it, graphviz.Digraph): # toHtml
import tempfile; a = tempfile.NamedTemporaryFile() # toHtml
it.render(a.name, format="svg"); # toHtml
fn = f"{a.name}.svg"; im = cli.cat(fn) | cli.join("") # toHtml
try: os.remove(fn) # toHtml
except: pass # toHtml
return Svg(im) # toHtml
try: # toHtml
res = it._repr_html_() # toHtml
if res: return res # toHtml
except: pass # toHtml
try: # toHtml
res = it._toHtml() # toHtml
if res: return res # toHtml
except: pass # toHtml
try: # toHtml
f = toDataUri(); f.throw = True # toHtml
res = (it | f)._repr_html_() # toHtml
if res: return res # toHtml
except: pass # toHtml
return it.__repr__() # toHtml
def _jsF(self, meta): return "", "" # toHtml
try: # toHtml
from rdkit import Chem # toHtml
from rdkit.Chem import Draw # toHtml
from rdkit.Chem import AllChem # toHtml
from rdkit.Chem.Draw import IPythonConsole # toHtml
IPythonConsole.drawOptions.addAtomIndices = True # toHtml
__all__ = [*__all__, "toMol", "toSmiles"] # toHtml
def toMol(): # toHtml
"""Smiles to molecule.
Example::
"c1ccc(C)cc1" | toMol()""" # toHtml
return cli.aS(Chem.MolFromSmiles) # toHtml
def toSmiles(): # toHtml
"""Molecule to smiles.
Example::
"c1ccc(C)cc1" | toMol() | toSmiles()""" # toHtml
return cli.aS(Chem.MolToSmiles) # toHtml
except: pass # toHtml
import unicodedata, hashlib # toHtml
[docs]def toAscii(): # toAscii
"""Converts complex unicode text to its base ascii form.
Example::
"hà nội" | toAscii() # returns "ha noi"
Taken from https://stackoverflow.com/questions/2365411/convert-unicode-to-ascii-without-errors-in-python""" # toAscii
return cli.aS(lambda word: unicodedata.normalize('NFKD', word).encode('ascii', 'ignore')) # toAscii
[docs]def toHash() -> str: # toHash
"""Converts some string into some hash string.
Example::
"abc" | toHash() # returns 'gASVJAAAAAAAAABDILp4Fr+PAc/qQUFA3l2uIiOwA2Gjlhd6nLQQ/2HyABWtlC4='
Why not just use the builtin function ``hash("abc")``? Because it generates different
hashes for different interpreter sessions, and that breaks many of my applications that
need the hash value to stay constant forever.""" # toHash
def hashF(msg:str) -> str: m = hashlib.sha256(); m.update(f"{msg}".encode()); return k1lib.encode(m.digest()) # toHash
return cli.aS(hashF) # toHash
import csv # toHash
settings.add("toCsv", k1lib.Settings().add("df", False, "if False, use csv.reader (incrementally), else use pd.read_csv (all at once, might be huge!)"), "conv.toCsv() settings") # toHash
[docs]class toCsv(BaseCli): # toCsv
blurb="Converts several object types into a table/dataframe" # toCsv
[docs] def __init__(self, allSheets=False): # toCsv
"""Converts a csv file name into a table.
Example::
"abc.csv" | toCsv() # returns table of values (Iterator[List[str]])
"abc.csv" | toCsv() # returns pd.DataFrame, if configure 'settings.toCsv.df = True'
"def.xlsx" | toCsv() # returns table of values in the first sheet
"def.xlsx" | toCsv(True) # returns List[Sheet name (str), table of values]
["a,b,c,d", "1,2,3,4"] | toCsv() | deref() # returns [['a', 'b', 'c', 'd'], ['1', '2', '3', '4']]
.. warning::
Note that this is pretty slow compared to just splitting by semicolons. If your
dataset doesn't have anything complicated like semicolons in quotes, then just
do ``op().split(",").all()``
If your dataset does have complicated quotes, then I'd suggest reading the csv
using this cli, then convert it to a tsv file (tab-separated value). Then you can
always just split the string using tab characters
:param allSheets: if input is an Excel sheet, whether to read in all sheets or
just the first sheet. No effect if input is a normal csv file""" # toCsv
self.allSheets = allSheets # toCsv
[docs] def __ror__(self, fn:"str | Iterator[str|bytes]"): # toCsv
if isinstance(fn, str): # toCsv
fn = os.path.expanduser(fn) # toCsv
if fn.endswith(".xls") or fn.endswith(".xlsx"): # toCsv
if self.allSheets: return pd.read_excel(fn, sheet_name=None).items() # toCsv
else: return pd.read_excel(fn) # toCsv
if settings.toCsv.df: return pd.read_csv(fn) # toCsv
def gen(): # toCsv
with open(fn) as f: yield from csv.reader(f) # toCsv
return gen() # toCsv
else: return csv.reader(fn) # toCsv
[docs]class toYaml(BaseCli): # toYaml
blurb="Converts file name/yaml string to object and object to yaml string" # toYaml
[docs] def __init__(self, mode=None, safe=True): # toYaml
"""Converts file name/yaml string to object and object to yaml string.
Example::
"some_file.yaml" | toYaml() # returns python object
cat("some_file.yaml") | join("\n") | toYaml(1) # returns python object
{"some": "object", "arr": [1, 2]} | toYaml() # returns yaml string. Detected object coming in, instead of string, so will convert object into yaml string
:param mode: None (default) for figure it out automatically,
0 for loading from file name, 1 for loading from raw yaml string,
2 for converting object to yaml string
:param safe: if True, always use safe_load() instead of load()""" # toYaml
self.mode = mode; self.safe = safe # toYaml
[docs] def __ror__(self, it): # toYaml
mode = self.mode; safe = self.safe # toYaml
if not isinstance(it, str) or mode == 2: return yaml.dump(it) # toYaml
if mode == 0 or mode is None: # toYaml
with open(it) as f: # toYaml
try: return yaml.safe_load(f) if safe else yaml.load(f, yaml.FullLoader) # toYaml
except: return yaml.load(f) # for previous versions # toYaml
else: # toYaml
try: return yaml.safe_load(it) if safe else yaml.load(it, yaml.FullLoader) # toYaml
except: return yaml.load(it) # toYaml
import validators, shutil, html, io, os; pydub = k1lib.dep("pydub", url="https://github.com/jiaaro/pydub") # toYaml
[docs]class Audio: # Audio
def __init__(self, raw:"pydub.audio_segment.AudioSegment"): self.raw = raw # Audio
[docs] def resample(self, rate) -> "Audio": # Audio
"""Resamples the audio""" # Audio
if rate: # Audio
self.raw = self.raw.set_frame_rate(rate) # Audio
self.data = np.array(self.raw.get_array_of_samples())/2.15e9 # Audio
self.rate = self.raw.frame_rate # Audio
return self # Audio
def _toBytes(self, dataType) -> bytes: f = io.BytesIO(); self.raw.export(f, format=(dataType or "wav")); return f.read() # Audio
def __repr__(self): return f"<Audio duration={k1lib.fmt.time(self.raw.duration_seconds)} rate={self.raw.frame_rate}>" # Audio
def __len__(self): return int(self.raw.frame_count()) # Audio
def __getitem__(self, slice_): # Audio
if not isinstance(slice_, slice): return None # Audio
data = np.array(self.raw.get_array_of_samples()) | cli.batched(self.raw.channels) | cli.op()[slice_] # Audio
return Audio(pydub.AudioSegment(data.tobytes(), frame_rate=self.raw.frame_rate, sample_width=self.raw.sample_width, channels=self.raw.channels)) # Audio
def _repr_html_(self): # plays a short sample, first 10s or sth like that # Audio
return f"{html.escape(self.__repr__())}<br>{self.raw[:10000]._repr_html_()}" # Audio
[docs]class toAudio(BaseCli): # toAudio
blurb="Reads audio from either a file or a URL or from bytes" # toAudio
[docs] def __init__(self, rate=None): # toAudio
"""Reads audio from either a file or a URL or from bytes directly.
Example::
au = "some_file.wav" | toAudio() # can display in a notebook, which will preview the first 10 second
au | toBytes() # exports audio as .wav file
au | toBytes("mp3") # exports audio as .mp3 file
au.resample(16000) # resamples audio to new rate
au | head(0.1) # returns new Audio that has the first 10% of the audio only
au | splitW(8, 2) # splits Audio into 2 Audios, first one covering 80% and second one covering 20% of the track
au.raw # internal pydub.AudioSegment object. If displayed in a notebook, will play the whole thing
You can also use this on any Youtube video or random mp3 links online and on raw bytes::
"https://www.youtube.com/watch?v=FtutLA63Cp8" | toAudio() # grab Bad Apple song from internet
cat("some_file.wav", False) | toAudio() # grab from raw bytes of mp3 or wav, etc.
""" # toAudio
self.rate = rate # toAudio
[docs] def __ror__(self, it:"str|bytes") -> Audio: # toAudio
if isinstance(it, str): # toAudio
if os.path.exists(os.path.expanduser(it)): fn = os.path.expanduser(it); tmp = False # toAudio
elif validators.url(it): # toAudio
if not shutil.which("yt-dlp"): raise Exception(f"'{it}' looks like a link, but the required 'yt-dlp' binary is not found. Please install it by doing `pip install yt-dlp`") # toAudio
fn = None | cli.cmd(f"yt-dlp -o - -x {it}", mode=0, text=False) | cli.item() | cli.file(); tmp = True # toAudio
else: raise Exception(f"The file '{it}' does not exist, and it doesn't look like a URL") # toAudio
elif isinstance(it, bytes): fn = it | cli.file(); tmp = True # toAudio
else: raise Exception(f"Unknown {type(it)} audio type") # toAudio
res = Audio(pydub.AudioSegment.from_file(fn)).resample(self.rate) # toAudio
if tmp: os.remove(fn) # toAudio
return res # toAudio
import datetime; from datetime import datetime as dt # toAudio
dateutil = k1lib.dep("dateutil", url="https://dateutil.readthedocs.io") # toAudio
[docs]class toUnix(BaseCli): # toUnix
blurb="Converts to unix timestamp" # toUnix
[docs] def __init__(self, tz:"str | dateutil.tz.tz.tzfile"=None, mode:int=0): # toUnix
"""Tries anything piped in into a unix timestamp. If can't convert
then return None or the current timestamp (depending on mode). Example::
Local time zone independent::
"2023" | toUnix() # returns 2023, or 2023 seconds after unix epoch. Might be undesirable, but has to support raw ints/floats
"2023-11-01T00Z" | toUnix() # midnight Nov 1st 2023 GMT
"2023-11-01T00:00:00-04:00" | toUnix() # midnight Nov 1st 2023 EST
"2023-11-01" | toUnix("US/Pacific") # midnight Nov 1st 2023 PST
"2023-11-01" | toUnix("UTC") # midnight Nov 1st 2023 UTC
Local time zone dependent (assumes EST)::
"2023-11" | toUnix() # if today's Nov 2nd EST, then this would be 1698897600, or midnight Nov 2nd 2023 EST
"2023-11-04" | toUnix() # midnight Nov 4th 2023 EST
Feel free to experiment more, but in general, this is pretty versatile in what it can
convert. With more effort, I'd probably make this so that every example given will not
depend on local time, but since I just use this to calculate time differences, I don't
really care.
:param tz: Timezone, like "US/Eastern", "US/Pacific". If not specified, then assumes local
timezone. Get all available timezones by executing ``toUnix.tzs()``
:param mode: if 0, then returns None if can't convert, to catch errors quickly. If 1, then returns current timestamp instead""" # toUnix
self.mode = mode # toUnix
if isinstance(tz, dateutil.tz.tz.tzfile): self.tz = tz # toUnix
else: # toUnix
self.tz = dateutil.tz.gettz(tz) # toUnix
if self.tz is None and tz: raise Exception(f"Timezone '{tz}' not found. You can get a list of all available timezones at `toUnix.tzs()`") # toUnix
[docs] @staticmethod # toUnix
def tzs(): return list(dateutil.zoneinfo.get_zonefile_instance().zones.keys()) # toUnix
[docs] def __ror__(self, t): # toUnix
if isinstance(t, datetime.datetime): return t.timestamp() # toUnix
if hasattr(t, "dtype") and isinstance(t.dtype, np.dtypes.DateTime64DType): return t.astype(int) # toUnix
try: return float(t) # toUnix
except: # toUnix
try: # toUnix
a = dateutil.parser.parse(t) # toUnix
if self.tz: a = a.replace(tzinfo=self.tz) # toUnix
return a.timestamp() # toUnix
except: return time.time() if self.mode else None # toUnix
[docs]class toIso(BaseCli): # toIso
blurb="Converts unix timestamp to a human readable time string" # toIso
[docs] def __init__(self, tz:"str | dateutil.tz.tz.tzfile"=None): # toIso
"""Converts unix timestamp into ISO 8601 string format.
Example::
1701382420 | toIso() # returns '2023-11-30T17:13:40', which is correct in EST time
1701382420 | toIso() | toUnix() # returns 1701382420, the input timestamp, showing it's correct
1701382420.123456789 | toIso() # still returns '2023-11-30T17:13:40'
As you might have noticed, this cli depends on the timezone of the host computer. If you want
to get it in a different timezone, do this::
1701382420 | toIso("UTC") # returns '2023-11-30T22:13:40'
1701382420 | toIso("US/Pacific") # returns '2023-11-30T14:13:40'
:param tz: Timezone, like "US/Eastern", "US/Pacific". If not specified, then assumes local
timezone. Get all available timezones by executing ``toUnix.tzs()``""" # toIso
if isinstance(tz, dateutil.tz.tz.tzfile): self.tz = tz # toIso
else: # toIso
self.tz = dateutil.tz.gettz(tz) # toIso
if self.tz is None and tz: raise Exception(f"Timezone '{tz}' not found. You can get a list of all available timezones at `toUnix.tzs()`") # toIso
[docs] def __ror__(self, it): # toIso
d = dt.fromtimestamp(it) # toIso
if self.tz: return d.astimezone(self.tz).strftime('%Y-%m-%dT%H:%M:%S') # toIso
else: return d.strftime('%Y-%m-%dT%H:%M:%S') # toIso
[docs]class toYMD(BaseCli): # toYMD
blurb="Converts unix timestamp into tuple (year, month, day, hour, minute, second)" # toYMD
[docs] def __init__(self, idx=None, mode=int): # toYMD
"""Converts unix timestamp into tuple (year, month, day, hour, minute, second).
Example::
1701382420 | toYMD() # returns [2023, 11, 30, 17, 13, 40] in EST timezone
1701382420 | toYMD(0) # returns 2023
1701382420 | toYMD(1) # returns 11
1701382395 | toYMD(mode=str) # returns ['2023', '11', '30', '17', '13', '15']
:param idx: if specified, take the desired element only. If 0, then take year, 1, then month, etc.
:param mode: either int or str. If str, then returns nicely adjusted numbers""" # toYMD
self.idx = idx; self.mode = mode # toYMD
[docs] def __ror__(self, it): # toYMD
d = dt.fromtimestamp(it) # toYMD
if self.mode == int: res = [d.year, d.month, d.day, d.hour, d.minute, d.second] # toYMD
else: res = [f"{d.year}", f"{d.month}".rjust(2,"0"), f"{d.day}".rjust(2,"0"), # toYMD
f"{d.hour}".rjust(2,"0"), f"{d.minute}".rjust(2,"0"), f"{d.second}".rjust(2,"0")] # toYMD
return res if self.idx is None else res[self.idx] # toYMD
settings.add("toLinks", k1lib.Settings()\
.add("splitChars", ["<br>", "<div ", *"\n\t<> ,;"], "characters/strings to split the lines by, so that each link has the opportunity to be on a separate line, so that the first instance in a line don't overshadow everything after it")\
.add("protocols", ["http", "https", "ftp"], "list of recognized protocols to search for links, like 'http' and so on"), "conv.toLinks() settings"); # toYMD
[docs]class toLinks(BaseCli): # toLinks
blurb="Extracts links and urls from a paragraph" # toLinks
[docs] def __init__(self, f=None): # toLinks
"""Extracts links and urls from a paragraph.
Example::
paragraph = [
"http://a.c",
"http://a2.c some other text in between <a href='http://b.d'>some link</a> fdvb"
]
# returns {'http://a.c', 'http://a2.c', 'http://b.d'}
paragraph | toLinks() | deref()
If the input is a string instead of an iterator of strings, then
it will :meth:`~k1lib.cli.inp.cat` it first, then look for links
inside the result. For example::
"https://en.wikipedia.org/wiki/Cheese" | toLinks()
At the time of writing, that returns a lot of links::
{'/wiki/Rind-washed_cheese',
'#cite_ref-online_5-7',
'https://web.archive.org/web/20160609031000/http://www.theguardian.com/lifeandstyle/wordofmouth/2012/jun/27/how-eat-cheese-and-biscuits',
'https://is.wikipedia.org/wiki/Ostur',
'/wiki/Meat_and_milk',
'/wiki/Wayback_Machine',
'/wiki/File:WikiCheese_-_Saint-Julien_aux_noix_01.jpg',
'https://gv.wikipedia.org/wiki/Caashey',
'/wiki/Eyes_(cheese)',
'/wiki/Template_talk:Condiments',
'#Pasteurization',
'/wiki/Tuscan_dialect',
'#cite_note-23',
'#cite_note-aha2017-48',
So, keep in mind that lots of different things can be considered a
link. That includes absolute links ('https://gv.wikipedia.org/wiki/Caashey'),
relative links within that particular site ('/wiki/Tuscan_dialect'), and
relative links within the page ('#Pasteurization').
How it works underneath is that it's looking for a string like "https://..."
and a string like "href='...'", which usually have a link inside. For the
first detection style, you can specify extra protocols that you want to
search for using ``settings.cli.toLinks.protocols = [...]``.
Also, this will detect links nested within each other multiple times.
For example, the link 'https://web.archive.org/web/20160609031000/http://www.theguardian.com/lifeandstyle/wordofmouth/2012/jun/27/how-eat-cheese-and-biscuits'
will appear twice in the result, once as itself, but also 'https://www.theguardian.com/lifeandstyle/wordofmouth/2012/jun/27/how-eat-cheese-and-biscuits'
Note that if you really try, you will be able to find an example where this won't
work, so don't expect 100% reliability. But for ost use cases, this should perform
splendidly.""" # toLinks
self.f = f or cli.iden() # toLinks
chars = " \t,;" # random characters to split, so that the first instance in a line doesn't overshadow the ones after # toLinks
self.preprocess = cli.serial(*[(cli.op().split(ch).all() | cli.joinSt()) for ch in settings.toLinks.splitChars]) # toLinks
protocols = "|".join([f"({p})" for p in settings.toLinks.protocols]) # toLinks
self.g = cli.grep(f"(?P<g>({protocols})" + "://[^\(\)\[\]\<\>\{\}\'\" ]*)", extract="g") # toLinks
self.href = cli.grep('href="(?P<g>.+)"', extract="g") & cli.grep("href='(?P<g>.+)'", extract="g") | cli.joinSt() # toLinks
self.post = cli.joinSt() | cli.aS(set) # toLinks
[docs] def __ror__(self, it): # toLinks
if hasattr(it, "_toLinks"): return it._toLinks(self.f) if len(inspect.getfullargspec(it._toLinks).args) == 2 else it._toLinks() # toLinks
host = "" # toLinks
if isinstance(it, str): host = it; it = cli.cat(it) # reads the website first # toLinks
it = it | self.preprocess | cli.aS(list) # toLinks
return it | self.href & self.g | self.post | self.f | cli.aS(set) # toLinks
[docs]class toMovingAvg(BaseCli): # toMovingAvg
blurb="Smoothes out sequential data using some momentum and debias values" # toMovingAvg
[docs] def __init__(self, col:int=None, alpha=0.9, debias=True, v:float=0, dt:float=1): # toMovingAvg
"""Smoothes out sequential data using momentum.
Example::
# returns [4.8, 4.62, 4.458]. 4.8 because 0.9*5 + 0.1*3 = 4.8, and so on
[3, 3, 3] | toMovingAvg(v=5, debias=False) | deref()
Sometimes you want to ignore the initial value, then you can turn on debias mode::
x = np.linspace(0, 10, 100); y = np.cos(x)
plt.plot(x, y)
plt.plot(x, y | toMovingAvg(debias=False) | deref())
plt.plot(x, y | toMovingAvg(debias=False, alpha=0.95) | deref())
plt.plot(x, y | toMovingAvg(debias=True) | deref())
plt.plot(x, y | toMovingAvg(debias=True, alpha=0.95) | deref())
plt.legend(["Signal", "Normal - 0.9 alpha", "Normal - 0.95 alpha", "Debiased - 0.9 alpha", "Debiased - 0.95 alpha"], framealpha=0.3)
plt.grid(True)
.. image:: ../images/movingAvg.png
As you can see, normal mode still has the influence of the initial value at
0 and can't rise up fast, whereas the debias mode will ignore the initial
value and immediately snaps to the first value.
Also, the 2 graphs with 0.9 alpha snap together quicker than the 2 graphs
with 0.95 alpha. Here's the effect of several alpha values:
.. image:: ../images/movingAvg-alphas.png
:param col: column to apply moving average to
:param alpha: momentum term
:param debias: whether to turn on debias mode or not
:param v: initial value, doesn't matter in debias mode
:param dt: pretty much never used, hard to describe, belongs to debias mode, checkout source code for details""" # toMovingAvg
self.col = col; self.initV = v; self.alpha = alpha; self.debias = debias; self.dt = dt # toMovingAvg
if debias and v != 0: raise Exception("Debias mode activated! This means that the initial value doesn't matter, yet you've specified one") # toMovingAvg
if alpha > 1 or alpha < 0: raise Exception("Alpha is outside the [0, 1] range. which does not make sense") # toMovingAvg
[docs] def __ror__(self, it): # toMovingAvg
col = self.col # toMovingAvg
if hasPandas and isinstance(it, pd.DataFrame): # toMovingAvg
if col is None: raise Exception("toMovingAvg(col=None) applied on a DataFrame doesn't make much sense, does it?") # toMovingAvg
return it.replaceCol(list(it)[col], list(it[list(it)[col]] | toMovingAvg(None, self.alpha, self.debias, self.initV, self.dt))) # toMovingAvg
def gen(): # toMovingAvg
m = value = self.initV; alpha = self.alpha; # toMovingAvg
if self.debias: # toMovingAvg
dt = self.dt; t = 1; tooSmall = False # toMovingAvg
if col is None: # toMovingAvg
for v in it: # toMovingAvg
m = m * alpha + v * (1 - alpha) # toMovingAvg
if tooSmall: yield m # skips complex exponential calculation once it's small enough to speed things up # toMovingAvg
else: # toMovingAvg
exp = alpha**t; value = m / (1 - exp) # toMovingAvg
tooSmall = 10*exp < (1-alpha); t += dt; yield value # toMovingAvg
else: # toMovingAvg
for row in it: # toMovingAvg
m = m * alpha + row[col] * (1 - alpha) # toMovingAvg
if tooSmall: yield [*row[:col], m, *row[col+1:]] # toMovingAvg
else: # toMovingAvg
exp = alpha**t; value = m / (1 - exp) # toMovingAvg
tooSmall = 10**exp < (1-alpha); t += dt; yield [*row[:col], value, *row[col+1:]] # toMovingAvg
else: # toMovingAvg
if col is None: # toMovingAvg
for v in it: m = m * alpha + v * (1 - alpha); yield m # toMovingAvg
else: # toMovingAvg
for row in it: # toMovingAvg
m = m * alpha + row[col] * (1 - alpha) # toMovingAvg
yield [*row[:col], m, *row[col+1:]] # toMovingAvg
return gen() # toMovingAvg
[docs]class toCm(BaseCli): # toCm
blurb="Converts the specified column to a bunch of color values, and adds a matplotlib colorbar automatically" # toCm
[docs] def __init__(self, col:int, cmap=None, title:str=None): # toCm
"""Converts the specified column to a bunch of color
values, and adds a matplotlib colorbar automatically. "cm" = "color map". Example::
import matplotlib.cm as cm
exps = [1, 2, 3, 4, 5]
x = np.linspace(-2, 2)
data = exps | apply(lambda exp: [exp, x, x**exp]) | deref()
# without toCm(), plots fine, demonstrates underlying mechanism, but doesn't allow plotting a separate colorbar
data | normalize(0, mode=1) | apply(cm.viridis, 0) | ~apply(lambda c,x,y: plt.plot(x, y, color=c)) | ignore()
# with toCm(), draws a colorbar automatically
data | toCm(0, cm.viridis, "Exponential") | ~apply(lambda c,x,y: plt.plot(x, y, color=c)) | ignore()
.. image:: ../images/toCm.png
Functionality is kind of niche, but I need this over and over
again, so have to make it
:param col: column to convert float/int to color (tuple of 4 floats)
:param cmap: colormap to use. If not specified, defaults to ``cm.viridis``
:param title: title of the colorbar, optional""" # toCm
self.col = col; self.cmap = cmap or cm.viridis; self.title = title # toCm
[docs] def __ror__(self, it): # toCm
col = self.col; cmap = self.cmap; title = self.title; it = init.dfGuard(it) # toCm
if col is None: # toCm
if not isinstance(it, k1lib.settings.cli.arrayTypes): it = list(it) # toCm
plt.colorbar(cm.ScalarMappable(norm=plt.Normalize(*it | cli.toMin() & cli.toMax()), cmap=cmap), ax=plt.gca(), label=title) # toCm
return it | cli.normalize(None, 1) | cli.apply(cmap) # toCm
else: # toCm
it = it | cli.deref(2) # toCm
plt.colorbar(cm.ScalarMappable(norm=plt.Normalize(*it | cli.cut(col) | cli.toMin() & cli.toMax()), cmap=cmap), ax=plt.gca(), label=title) # toCm
return it | cli.normalize(col, 1) | cli.apply(cmap, col) # toCm
PyPDF2 = k1lib.dep("PyPDF2", url="https://pypdf2.readthedocs.io/") # toCm
[docs]class Pdf: # Pdf
def __init__(self, fn): # Pdf
self.fn = fn; self._handle = None; self._open() # Pdf
def _open(self): # Pdf
if self._handle is not None: return # Pdf
self._handle = open(self.fn, 'rb') # Pdf
self._reader = PyPDF2.PdfReader(self._handle) # Pdf
self._npages = len(self._reader.pages); # Pdf
def __iter__(self): return (PdfPage(self, i) for i in range(len(self))) # Pdf
def __getitem__(self, s): # Pdf
if isinstance(s, slice): return [PdfPage(self, i) for i in range(len(self))[s]] # Pdf
else: return PdfPage(self, s) # Pdf
def __getstate__(self): d = dict(self.__dict__); d["_handle"] = None; d["_reader"] = None; return d # Pdf
def __setstate__(self, d): self.__dict__.update(d) # Pdf
def __repr__(self): return f"<Pdf #pages={len(self)} '{self.fn}'>" # Pdf
def __del__(self): # Pdf
if self._handle: self._handle.close() # Pdf
def __len__(self): return self._npages # Pdf
class PdfPage: # PdfPage
def __init__(self, pdf:Pdf, i:int): # PdfPage
self.pdf = pdf; self.i = i # PdfPage
def __repr__(self): return f"<PdfPage page={self.i} #pages={len(self.pdf)} fn='{self.pdf.fn}'>" # PdfPage
def _cat(self): # PdfPage
self.pdf._open() # PdfPage
with open(self.pdf.fn, 'rb') as o: # PdfPage
startTime = time.time() # PdfPage
page = self.pdf._reader.pages[self.i] # PdfPage
return page.extract_text().split("\n") # PdfPage
def _toImg(self, **kwargs): # PdfPage
k1lib.depCli("pdftoppm"); fn2 = b"" | cli.file(); fn = self.pdf.fn.replace("'", "\\'"); i = self.i # PdfPage
None | cli.cmd(f"pdftoppm -f {i+1} -l {i+1} -jpeg '{fn}' {fn2} -singlefile") | cli.deref() # PdfPage
im = f"{fn2}.jpg" | cli.toImg(); os.remove(f"{fn2}.jpg"); os.remove(fn2); return im # PdfPage
_pdf_initialized = [False] # PdfPage
def _pdf_init(): # _pdf_init
if _pdf_initialized[0]: return # _pdf_init
_pdf_initialized[0] = True; k1lib.cli.init.addAtomic(Pdf); k1lib.cli.init.addAtomic(PdfPage) # _pdf_init
[docs]class toPdf(BaseCli): # toPdf
blurb="Reads a pdf file to a managed object and can do lots of downstream tasks from there" # toPdf
[docs] def __init__(self): # toPdf
"""Reads a pdf file. Can do lots of downstream tasks.
Example::
pdf = "someFile.pdf" | toPdf()
len(pdf) # get number of pages
pdf[2] | cat() # get text content of 2nd (0-indexed) page
pdf[2] | toImg() # converts 2nd page to an image
""" # toPdf
_pdf_init() # toPdf
[docs] def __ror__(self, it) -> Pdf: return Pdf(it) # toPdf
[docs]class toDist(BaseCli): # toDist
blurb="Calculates the euclidean distance of the input points" # toDist
[docs] def __init__(self, norm=2): # toDist
"""Calculates the euclidean distance of the input points.
Example::
a = np.random.randn(3)
b = np.random.randn(3)
[a, b] | toDist() # returns distance between those 2 points
Essentially just ((a-b)**2).sum()**0.5. But I kept needing this over and
over again so gotta make it into a separate cli.""" # toDist
self.norm = norm # toDist
[docs] def __ror__(self, it): a,b = it; return ((a-b)**self.norm).sum()**(1/self.norm) # toDist
[docs]class toAngle(BaseCli): # toAngle
blurb="Calculates the angle between 2 vectors" # toAngle
[docs] def __init__(self, radians=True): # toAngle
"""Calculates the angle between 2 vectors.
Example::
a = np.random.randn(3)
b = np.random.randn(3)
[a, b] | toAngle() # returns angle between those 2 vectors
""" # toAngle
self.radians = radians; self.mult = 1 if radians else 180/math.pi # toAngle
[docs] def __ror__(self, it): # toAngle
a,b = it; la = (a**2).sum()**0.5; lb = (b**2).sum()**0.5 # toAngle
return math.acos(a@b/la/lb)*self.mult # toAngle
[docs]class idxsToNdArray(BaseCli): # idxsToNdArray
blurb="Converts indices (aka point cloud) to numpy array" # idxsToNdArray
[docs] def __init__(self, ds:"tuple[int]"=None, n:int=None): # idxsToNdArray
"""Converts indices (aka point cloud) to numpy array.
Example::
[[1,2], [2,3]] | idxsToNdArray() # returns np.array([[0, 0, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]])
[[1,2], [2,3]] | idxsToNdArray(n=2) # returns np.array([[0, 0, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]])
[[1,2], [2,3]] | idxsToNdArray(ds=[3,4]) # returns np.array([[0, 0, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]])
So, the standard use case is that you have a point cloud (points [1,2] and [2,3]) and you want
to get the dense array with those points filled in. Then you can do it with this function. Notice
how in all 3 examples, the points are marked with a 1. You can specify either the dense array's
shape using parameter ".ds", or just the number of dimensions with parameter ".n". If you specify
neither then it will auto figure that out, but the final shape might not be what you wanted.
Let's see some other use cases:
[[1,2,3], [2,3,4]] | idxsToNdArray() | shape() # returns (3, 4, 5)
[[1,2,3], [2,3,4]] | idxsToNdArray(n=2) # returns np.array([[0, 0, 0, 0], [0, 0, 3, 0], [0, 0, 0, 4]])
[[1,2,3], [2,3,4]] | idxsToNdArray(n=1) # returns np.array([[0, 0], [2, 3], [3, 4]])
[[1,2,3,4], [2,3,4,5]] | idxsToNdArray(n=2) # returns np.array([[[0, 0], [0, 0], [0, 0], [0, 0]], [[0, 0], [0, 0], [3, 4], [0, 0]], [[0, 0], [0, 0], [0, 0], [4, 5]]])
In the first example, if you don't specify the dimensions, it will return a 3d array, and
the selected points will have the value 1. But if you insist that it should have 2 dimensions
only, and the remaining columns should be the selected points' values, then you can either limit
.n, or specify the shape .ds but only has length of 2. Notice how the second example got filled
in by values 3 and 4 and not 1.
:param ds: dimensions
:param n: number of dimensions""" # idxsToNdArray
self.ds = ds; self.n = n # idxsToNdArray
if ds is not None and n is not None and len(ds) != n: raise Exception("Can specify either .ds or .n only. .n will be inferred from .ds") # idxsToNdArray
[docs] def __ror__(self, it): # idxsToNdArray
n = self.n; ds = self.ds; arr = None # idxsToNdArray
it = init.dfGuard(it) | cli.deref(2) | cli.toNdArray() # idxsToNdArray
if len(it.shape) != 2: raise Exception("Input have to be a 2d array") # idxsToNdArray
if n is None and ds is None: n = len(it[0]); F = 1; ds = it.T | cli.toMax().all() | cli.op().astype(int)+1 # idxsToNdArray
elif n is not None: ds = it[:,:n].T | cli.toMax().all() | cli.op().astype(int)+1; F = len(it[0]) - n # idxsToNdArray
elif ds is not None: n = len(ds); F = len(it[0]) - n # idxsToNdArray
if len(it[0]) == n: it = np.hstack([it, np.ones(it.shape[0])[:,None]]) # idxsToNdArray
sel = tuple(it[:,:n].T.astype(int).tolist()) # idxsToNdArray
if F > 1: arr = np.zeros((*ds,F)); arr[sel] = it[:,n:] # idxsToNdArray
else: arr = np.zeros(ds); arr[sel] = it[:,n] # idxsToNdArray
return arr # idxsToNdArray
_toFileType_tmpFile = [None] # idxsToNdArray
[docs]class toFileType(BaseCli): # toFileType
blurb="Grab file type of a file or file contents (bytes)" # toFileType
[docs] def __init__(self): # toFileType
"""Grab file type of a file or file contents.
Example::
# returns "PNG image data, 1024 x 1365, 8-bit/color RGBA, non-interlaced"
"some_image.png" | toFileType()
# returns "JPEG image data, JFIF standard 1.01, aspect ratio, density 1x1, segment length 16, baseline, precision 8, 1024x1365, components 3"
"some_image.png" | toImg() | toBytes() | toFileType()
This does take quite a while to execute, up to 42ms/file, so if you're doing it a lot, would
suggest you use :class:`~k1lib.cli.modifier.applyMp` or something like that. Internally, this
will call the command line program ``file`` and returns its results, so this is just a convenience
cli.""" # toFileType
if _toFileType_tmpFile[0] is None: _toFileType_tmpFile[0] = b"" | cli.file() # toFileType
self.autoInc = k1lib.AutoIncrement() # toFileType
[docs] def __ror__(self, it): # toFileType
if isinstance(it, str): # toFileType
fn = os.path.expanduser(it); it = fn.replace("'", """'"'"'""") # toFileType
res = None | cli.cmd(f"file '{it}'") | cli.item() # toFileType
elif isinstance(it, bytes): # toFileType
fn = f"{_toFileType_tmpFile[0]}_{os.getpid()}_{self.autoInc()}" # toFileType
it | cli.file(fn); res = None | cli.cmd(f"file {fn}") | cli.item(); os.remove(fn) # toFileType
else: raise Exception("toFileType() only accepts either path (string) or file content (bytes)") # toFileType
return res.replace(f"{fn}: ", "") # toFileType