Source code for k1lib.cli.conv

This is for all short utilities that converts from 1 data type to another. They
might feel they have different styles, as :class:`toFloat` converts object iterator to
float iterator, while :class:`toImg` converts single image url to single PIL image,
whereas :class:`toSum` converts float iterator into a single float value.

The general convention is, if the intended operation sounds simple (convert to floats,
strings, types, ...), then most likely it will convert iterator to iterator, as you
can always use the function directly if you only want to apply it on 1 object.

If it sounds complicated (convert to PIL image, tensor, ...) then most likely it will
convert object to object. Lastly, there are some that just feels right to input
an iterator and output a single object (like getting max, min, std, mean values)."""
__all__ = ["toNdArray", "toTensor", "toRange", "toList",
           "toSum", "toProd", "toAvg", "toMean", "toStd", "toMedian", "toMax", "toMin", "toArgmin", "toArgmax",
           "toImg", "toRgb", "toRgba", "toGray", "toDict",
           "toFloat", "toInt", "toBytes", "toDataUri", "toAnchor", "toHtml",
           "toAscii", "toHash", "toCsv", "toYaml", "Audio", "toAudio", "toUnix", "toIso", "toYMD", "toLinks",
           "toMovingAvg", "toCm", "Pdf", "toPdf", "toDist", "toAngle", "idxsToNdArray", "toFileType"]
import re, k1lib, math, os, numpy as np, io, json, base64, unicodedata, inspect, time
from k1lib.cli.init import BaseCli, T, yieldT; import k1lib.cli as cli, k1lib.cli.init as init
from k1lib.cli.typehint import *; mpl = k1lib.dep.mpl; plt = k1lib.dep.plt; yaml = k1lib.dep.yaml; pd = k1lib.dep.pd; cm =
from collections import deque, defaultdict; from typing import Iterator, Any, List, Set, Tuple, Dict, Callable, Union
settings = k1lib.settings.cli; imgkit = k1lib.dep("imgkit", url="")
try: import PIL; import PIL.Image; hasPIL = True
except: hasPIL = False
try: import torch; hasTorch = True
except: torch = k1lib.dep.torch; hasTorch = False
try: import rdkit; hasRdkit = True
except: hasRdkit = False
try: import graphviz; hasGraphviz = True
except: hasGraphviz = False
try: import plotly; import as px; hasPlotly = True
except: hasPlotly = False
try: import pandas as pd; pd.core; hasPandas = True
except: hasPandas = False
[docs]class toNdArray(BaseCli): # toNdArray blurb="Converts several data types to numpy.ndarray" # toNdArray
[docs] def __init__(self, dtype=None): # toNdArray """Converts generator/several data types to :class:`numpy.ndarray`. Essentially ``np.array(list(it))``. Can convert PIL Image. Example:: # returns array([0., 1., 2.]) range(3) | toNdArray() # returns array([0., 1., 2.], dtype=float32) range(3) | toNdArray(np.float32) """ # toNdArray self.dtype = dtype # toNdArray
def _all_array_opt(self, it, level): return it.detach().cpu().numpy().asdtype(self.dtype) if hasTorch and isinstance(it, torch.Tensor) else it # toNdArray
[docs] def __ror__(self, it): # toNdArray if hasattr(it, "_toNdArray"): # toNdArray args = inspect.getfullargspec(it._toNdArray).args[1:]; n = len(args) # toNdArray s = set(["dtype"]); weirdArgs = [a for a in args if a not in s] # toNdArray if len(weirdArgs) > 0: raise Exception(f"Custom datatype `{type(it)}` has ._toNdArray() method, which expects only `dtype` arguments, but detected these arguments instead: {weirdArgs}. Please fix `{type(it)}`") # toNdArray return it._toNdArray() if n == 0 else it._toNdArray(self.dtype) # toNdArray if hasPIL and isinstance(it, PIL.Image.Image): # toNdArray mode_to_nptype = {'I': np.int32, 'I;16': np.int16, 'F': np.float32} # toNdArray img = np.array(it, mode_to_nptype.get(it.mode, np.uint8), copy=True) # toNdArray if it.mode == '1': img = 255 * img # toNdArray img = img.reshape((it.size[1], it.size[0], len(it.getbands()))) # toNdArray it = img.transpose((2, 0, 1)) # toNdArray if hasPandas and isinstance(it, pd.core.arraylike.OpsMixin): it = it.to_numpy() # toNdArray if hasTorch and isinstance(it, torch.Tensor): it = it.numpy() # toNdArray if not isinstance(it, np.ndarray): it = np.array(list(it)) # toNdArray return it.astype(self.dtype) if self.dtype else it # toNdArray
[docs]class toTensor(BaseCli): # toTensor blurb="Converts several data types to torch.Tensor" # toTensor
[docs] def __init__(self, dtype=None): # toTensor """Converts generator to :class:`torch.Tensor`. Essentially ``torch.tensor(list(it))``. Default dtype is float32. Can convert PIL Image. Example:: # returns tensor([0., 1., 2.], dtype=torch.float64) range(3) | toTensor(torch.float64) """ # toTensor self.dtype = dtype or torch.float32 # toTensor
def _all_array_opt(self, it, level): return torch.tensor(it, dtype=self.dtype) # toTensor
[docs] def __ror__(self, it:Iterator[float]) -> "torch.Tensor": # toTensor if hasattr(it, "_toTensor"): # toTensor args = inspect.getfullargspec(it._toTensor).args[1:]; n = len(args) # toTensor s = set(["dtype"]); weirdArgs = [a for a in args if a not in s] # toTensor if len(weirdArgs) > 0: raise Exception(f"Custom datatype `{type(it)}` has ._toTensor() method, which expects only `dtype` arguments, but detected these arguments instead: {weirdArgs}. Please fix `{type(it)}`") # toTensor return it._toTensor() if n == 0 else it._toTensor(self.dtype) # toTensor if not isinstance(it, torch.Tensor): it = torch.from_numpy(it | toNdArray()) # toTensor return # toTensor
[docs]class toList(BaseCli): # this still exists cause some LLVM optimizations are done on this, and too tired to change that at the moment # toList
[docs] def __init__(self): # toList """Converts generator to list. Example:: # returns [0, 1, 2, 3, 4] range(5) | toList() # returns [0, 1, 2, 3, 4] range(5) | aS(list) So this cli is sort of outdated. It still works fine, nothing wrong with it, but just do ``aS(list)`` instead. It's not removed to avoid breaking old projects.""" # toList super().__init__() # toList
def _all_array_opt(self, it, level): return it # toList def _typehint(self, inp): # toList if isinstance(inp, tListIterSet): return tList(inp.child) # toList if isinstance(inp, tCollection): return inp # toList return tList(tAny()) # toList
[docs] def __ror__(self, it:Iterator[Any]) -> List[Any]: return list(init.dfGuard(it)) # toList
def _jsF(self, meta): # toList fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toList return f"{fIdx} = ({dataIdx}) => {dataIdx}", fIdx # toList
def _toRange(it): # _toRange for i, _ in enumerate(it): yield i # _toRange
[docs]class toRange(BaseCli): # toRange blurb="Returns iter(range(len(it))), but incrementally" # toRange
[docs] def __init__(self): # toRange """Returns iter(range(len(it))), effectively. Example:: # returns [0, 1, 2] [3, 2, 5] | toRange() | deref()""" # toRange super().__init__() # toRange
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[int]: # toRange try: return range(len(it)) # toRange except: return _toRange(it) # toRange
def _jsF(self, meta): # toRange fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toRange return f"{fIdx} = ({dataIdx}) => {dataIdx}.toRange()", fIdx # toRange
tOpt.addPass(lambda cs, ts, _: [cs[0]], [toRange, toRange]) # toRange settings.add("arrayTypes", (torch.Tensor, np.ndarray) if hasTorch else (np.ndarray,), "default array types used to accelerate clis") # toRange def genericTypeHint(inp): # genericTypeHint if isinstance(inp, tListIterSet): return inp.child # genericTypeHint if isinstance(inp, tCollection): return inp.children[0] # genericTypeHint if isinstance(inp, tArrayTypes): return inp.child # genericTypeHint return tAny() # genericTypeHint
[docs]class toSum(BaseCli): # toSum blurb="Calculates the sum of a list of numbers" # toSum
[docs] def __init__(self): # toSum """Calculates the sum of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`. Example:: range(10) | toSum() # returns 45 np.random.randn(2, 3, 4) | toSum().all() | shape() # returns (2,) """ # toSum super().__init__() # toSum
def _all_array_opt(self, it, level): # toSum bm = np if isinstance(it, np.ndarray) else (torch if hasTorch and isinstance(it, torch.Tensor) else None) # toSum return NotImplemented if bm is None else bm.sum(it, tuple(range(level, len(it.shape)))) # toSum def _typehint(self, inp): return genericTypeHint(inp) # toSum
[docs] def __ror__(self, it:Iterator[float]): # toSum if isinstance(it, settings.arrayTypes) or (hasPandas and isinstance(it, pd.Series)): return it.sum() # toSum return sum(init.dfGuard(it)) # toSum
def _jsF(self, meta): # toSum fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toSum return f"{fIdx} = ({dataIdx}) => {dataIdx}.toSum()", fIdx # toSum
[docs]class toProd(BaseCli): # toProd blurb="Calculates the product of a list of numbers" # toProd
[docs] def __init__(self): # toProd """Calculates the product of a list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`. Example:: range(1,10) | toProd() # returns 362880 np.random.randn(2, 3, 4) | toProd().all() | shape() # returns (2,) """ # toProd super().__init__() # toProd
def _all_array_opt(self, it, level): # toProd if isinstance(it, np.ndarray): return, tuple(range(level, len(it.shape)))) # toProd elif hasTorch and isinstance(it, torch.Tensor): # toProd for i in range(level, len(it.shape)): it =, level) # toProd return it # toProd return NotImplemented # toProd def _typehint(self, inp): return genericTypeHint(inp) # toProd
[docs] def __ror__(self, it): # toProd if isinstance(it, settings.arrayTypes) or (hasPandas and isinstance(it, pd.Series)): return # toProd else: return # toProd
def _jsF(self, meta): # toProd fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toProd return f"{fIdx} = ({dataIdx}) => {dataIdx}.toProd()", fIdx # toProd
[docs]class toAvg(BaseCli): # toAvg blurb="Calculates the average of a list of numbers" # toAvg
[docs] def __init__(self): # toAvg """Calculates average of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`. Example:: range(10) | toAvg() # returns 4.5 [] | toAvg() # returns nan np.random.randn(2, 3, 4) | toAvg().all() | shape() # returns (2,) """ # toAvg super().__init__() # toAvg
def _all_array_opt(self, it, level): # toAvg bm = np if isinstance(it, np.ndarray) else (torch if hasTorch and isinstance(it, torch.Tensor) else None) # toAvg return NotImplemented if bm is None else bm.mean(it, tuple(range(level, len(it.shape)))) # toAvg def _typehint(self, inp): # toAvg i = None # toAvg if isinstance(inp, tListIterSet): i = inp.child # toAvg if isinstance(inp, tCollection): i = inp.children[0] # toAvg if isinstance(inp, tArrayTypes): i = inp.child # toAvg if i is not None: return float if i == int else i # toAvg return tAny() # toAvg
[docs] def __ror__(self, it:Iterator[float]): # toAvg if isinstance(it, settings.arrayTypes) or (hasPandas and isinstance(it, pd.Series)): return it.mean() # toAvg s = 0; i = -1 # toAvg for i, v in enumerate(init.dfGuard(it)): s += v # toAvg i += 1 # toAvg if not k1lib.settings.cli.strict and i == 0: return float("nan") # toAvg return s / i # toAvg
def _jsF(self, meta): # toAvg fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toAvg return f"{fIdx} = ({dataIdx}) => {dataIdx}.toAvg()", fIdx # toAvg
if hasTorch: # toAvg torchVer = int(torch.__version__.split(".")[0]) # toAvg if torchVer >= 2: # toAvg def torchStd(it, ddof, dim=None): return torch.std(it, dim, correction=ddof) # toAvg else: # toAvg def torchStd(it, ddof, dim=None): # toAvg if ddof == 0: return torch.std(it, dim, unbiased=False) # toAvg if ddof == 1: return torch.std(it, dim, unbiased=True) # toAvg raise Exception(f"Please install PyTorch 2, as version 1 don't support correction factor of {ddof}") # toAvg else: # toAvg def torchStd(it, ddof): raise Exception("PyTorch not installed") # toAvg
[docs]class toStd(BaseCli): # toStd blurb="Calculates the standard deviation of a list of numbers" # toStd
[docs] def __init__(self, ddof:int=0): # toStd """Calculates standard deviation of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray` to be faster. Example:: range(10) | toStd() # returns 2.8722813232690143 [] | toStd() # returns nan np.random.randn(2, 3, 4) | toStd().all() | shape() # returns (2,) :param ddof: "delta degree of freedom". The divisor used in calculations is ``N - ddof``""" # toStd self.ddof = ddof # toStd
def _all_array_opt(self, it, level): # toStd n = len(it.shape); ddof = self.ddof; dim = tuple(range(level, n)) # toStd if isinstance(it, np.ndarray): return np.std(it, ddof=ddof, axis=dim) # toStd elif hasTorch and isinstance(it, torch.Tensor): return torchStd(it, ddof, dim) # toStd return NotImplemented # toStd
[docs] def __ror__(self, it): # toStd ddof = self.ddof # toStd if hasPandas and isinstance(it, pd.Series): return it.std(ddof=self.ddof) # toStd if hasPandas and isinstance(it, pd.DataFrame): it = init.dfGuard(it) # toStd if isinstance(it, settings.arrayTypes): # toStd if isinstance(it, np.ndarray): return np.std(it, ddof=ddof) # toStd elif hasTorch and isinstance(it, torch.Tensor): return torchStd(it, ddof) # toStd return np.std(np.array(list(it)), ddof=ddof) # toStd
def _jsF(self, meta): # toStd fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toStd return f"{fIdx} = ({dataIdx}) => {dataIdx}.toStd()", fIdx # toStd
toMean = toAvg # toStd
[docs]class toMedian(BaseCli): # toMedian blurb="Calculates the median of a list of numbers" # toMedian
[docs] def __init__(self, percentile=50): # toMedian """Calculates the median of a list of numbers. Example:: range(10) | toMedian() # returns 4.5 [1, 2, 4] | toMedian() # returns 2.0 np.random.randn(3, 4, 5) | toMedian().all(1) | shape() # returns (3,) """ # toMedian self.percentile = percentile # toMedian
def _all_array_opt(self, it, level): # toMedian q = self.percentile; n = len(it.shape) # toMedian if n == level: return it # toMedian if n < level: raise init.ArrayOptException(f"You're trying to do `np.random.randn({', '.join(it.shape)}) | toMedian().all({level})` which does not make sense, as the array's dimension is less than the .all() dimension") # toMedian it = it | cli.joinSt(n-level-1).all(level) # toMedian if q == 50: # toMedian if isinstance(it, np.ndarray): return np.median(it, level) # toMedian if hasTorch and isinstance(it, torch.Tensor): return torch.median(it, level).values # toMedian else: # toMedian if isinstance(it, np.ndarray): return np.percentile(it, q, level) # toMedian if hasTorch and isinstance(it, torch.Tensor): return torch.tensor(np.percentile(it.cpu().numpy(), q, level), dtype=it.dtype, device=it.device) # toMedian return NotImplemented # toMedian
[docs] def __ror__(self, it): # toMedian q = self.percentile # toMedian if hasPandas and isinstance(it, pd.core.arraylike.OpsMixin): it = it.to_numpy() # toMedian if isinstance(it, np.ndarray): return np.median(it) if q == 50 else np.percentile(it, q) # toMedian if hasTorch and isinstance(it, torch.Tensor): return torch.median(it) if q == 50 else torch.tensor(np.percentile(it.cpu().numpy(), q), dtype=it.dtype, device=it.device) # toMedian try: return np.percentile(it, q) # toMedian except: return np.percentile(it | cli.deref(), q) # toMedian
[docs]class toMax(BaseCli): # toMax blurb="Calculates the max value of a list of numbers" # toMax
[docs] def __init__(self): # toMax """Calculates the max of a bunch of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`. Example:: [2, 5, 6, 1, 2] | toMax() # returns 6 np.random.randn(2, 3, 4) | toMax().all() | shape() # returns (2,) """ # toMax super().__init__() # toMax
def _all_array_opt(self, it, level): # toMax if isinstance(it, np.ndarray): return np.max(it, tuple(range(level, len(it.shape)))) # toMax elif hasTorch and isinstance(it, torch.Tensor): # toMax for i in range(level, len(it.shape)): it = torch.max(it, level)[0] # toMax return it # toMax return NotImplemented # toMax
[docs] def __ror__(self, it:Iterator[float]) -> float: # toMax if isinstance(it, settings.arrayTypes) or (hasPandas and isinstance(it, pd.Series)): return it.max() # toMax return max(it) # toMax
def _jsF(self, meta): # toMax fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toMax return f"{fIdx} = ({dataIdx}) => {dataIdx}.toMax()", fIdx # toMax
[docs]class toMin(BaseCli): # toMin blurb="Calculates the min value of a list of numbers" # toMin
[docs] def __init__(self): # toMin """Calculates the min of a bunch of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`. Example:: [2, 5, 6, 1, 2] | toMin() # returns 1 np.random.randn(2, 3, 4) | toMin().all() | shape() # returns (2,) """ # toMin super().__init__() # toMin
def _all_array_opt(self, it, level): # toMin if isinstance(it, np.ndarray): return np.min(it, tuple(range(level, len(it.shape)))) # toMin elif hasTorch and isinstance(it, torch.Tensor): # toMin for i in range(level, len(it.shape)): it = torch.min(it, level)[0] # toMin return it # toMin return NotImplemented # toMin
[docs] def __ror__(self, it:Iterator[float]) -> float: # toMin if isinstance(it, settings.arrayTypes) or (hasPandas and isinstance(it, pd.Series)): return it.min() # toMin return min(it) # toMin
def _jsF(self, meta): # toMin fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toMin return f"{fIdx} = ({dataIdx}) => {dataIdx}.toMin()", fIdx # toMin
[docs]class toArgmin(BaseCli): # toArgmin blurb="Grabs the min value's index" # toArgmin
[docs] def __init__(self): # toArgmin """Get the input iterator's index of the min value. Example:: [2, 3, 4, 1, 5] | toArgmin() # returns 3 np.random.randn(3, 4, 5) | toArgmin().all() | shape() # returns (3,) """ # toArgmin pass # toArgmin
def _all_array_opt(self, it, level): # toArgmin n = len(it.shape) # toArgmin if n < level: raise Exception(f"You're trying to do `np.random.randn({', '.join(it.shape)}) | toArgmin().all({level})` which does not make sense, as the array's dimension is less than the .all() dimension") # toArgmin it = it | cli.joinSt(n-level-1).all(level); return it.argmin(level) # toArgmin
[docs] def __ror__(self, it): # toArgmin if isinstance(it, k1lib.settings.cli.arrayTypes): return it.argmin().item() # toArgmin if hasPandas and isinstance(it, pd.Series): return it.argmin().item() # toArgmin try: len(it); return np.array(it).argmin().item() # toArgmin except: return np.array(it | cli.deref()).argmin().item() # toArgmin
[docs]class toArgmax(BaseCli): # toArgmax blurb="Grabs the max value's index" # toArgmax
[docs] def __init__(self): # toArgmax """Get the input iterator's index of the max value. Example:: [2, 3, 4, 1, 5] | toArgmax() # returns 4 np.random.randn(3, 4, 5) | toArgmax().all() | shape() # returns (3,) """ # toArgmax pass # toArgmax
def _all_array_opt(self, it, level): # toArgmax n = len(it.shape) # toArgmax if n < level: raise Exception(f"You're trying to do `np.random.randn({', '.join(it.shape)}) | toArgmax().all({level})` which does not make sense, as the array's dimension is less than the .all() dimension") # toArgmax it = it | cli.joinSt(n-level-1).all(level); return it.argmax(level) # toArgmax
[docs] def __ror__(self, it): # toArgmax if isinstance(it, k1lib.settings.cli.arrayTypes): return it.argmax().item() # toArgmax if hasPandas and isinstance(it, pd.Series): return it.argmax().item() # toArgmax try: len(it); return np.array(it).argmax().item() # toArgmax except: return np.array(it | cli.deref()).argmax().item() # toArgmax
settings.add("font", None, "default font file. Best to use .ttf files, used by toImg()") # toArgmax settings.add("chem", k1lib.Settings().add("imgSize", 200, "default image size used in toImg() when drawing rdkit molecules"), "chemistry-related settings") # toArgmax class Svg(str): # Svg def _toImg(self, **kwargs): # Svg import tempfile; a = tempfile.NamedTemporaryFile() # Svg import cairosvg; cairosvg.svg2png(bytestring=f"{self}",; im = | toImg() # Svg return im # Svg def _repr_html_(self): return self # Svg def cropToContentNp(ogIm, pad=10): # cropToContentNp dim = len(ogIm.shape); im = ogIm # cropToContentNp if dim > 2: im = im.mean(0) # cropToContentNp coords = np.argwhere(im.max()-im); x_min, y_min = coords.min(axis=0); x_max, y_max = coords.max(axis=0) # cropToContentNp return ogIm[x_min-pad:x_max+1+pad, y_min-pad:y_max+1+pad] if dim == 2 else ogIm[:,x_min-pad:x_max+1+pad, y_min-pad:y_max+1+pad] # cropToContentNp def cropToContentPIL(im, pad=0): # cropToContentPIL im = im | toTensor(int) | cli.op().numpy() | cli.aS(cropToContentNp, pad) # cropToContentPIL return torch.from_numpy(im).permute(1, 2, 0) | toImg() if len(im.shape) > 2 else im | toImg() # cropToContentPIL
[docs]class toImg(BaseCli): # toImg blurb="Converts multiple data types into a PIL image" # toImg
[docs] def __init__(self, closeFig=True, crop=True): # toImg """Converts multiple data types into a PIL image. Example:: ls(".") | toImg().all() | item() # grabs first image in the current folder torch.randn(100, 200) | toImg() # converts from tensor/array to image "abc.jpg" | toImg() | toBytes() | toImg() # grabs image, converts to byte stream, and converts back to image ["abc", "def"] | toImg() # converts paragraphs to image "c1ccc(C)cc1" | toMol() | toImg() # converts SMILES string to molecule, then to image ["ab", "bc", "ca"] | (kgv.sketch() | kgv.edges()) | toHtml() | toImg() # sketches a graphviz plot, converts to svg then renders the svg as an image df | toHtml() | toImg() # converts pandas data frame to html, then render it to image You can also save a matplotlib figure by piping in a :class:`matplotlib.figure.Figure` object:: x = np.linspace(0, 4) plt.plot(x, x**2) plt.gcf() | toImg() .. note:: If you are working with image tensors, which is typically have dimensions of (C, H, W), you have to permute it to PIL's (H, W, C) first before passing it into this cli. Also it's expected that your tensor image ranges from 0-255, and not 0-1. Make sure you renormalize it :param closeFig: if input is a matplotlib figure, then closes the figure after generating the image :param crop: whether to crop white spaces around an image or not""" # toImg import PIL; self.PIL = PIL; self.closeFig = closeFig; self.crop = crop # toImg
def _typehint(self, inp): # toImg return PIL.Image.Image # toImg
[docs] def __ror__(self, path) -> "PIL.Image.Image": # toImg if hasattr(path, "_toImg"): return path._toImg(closeFig=self.closeFig, crop=self.crop) # toImg if isinstance(path, str): return # toImg if isinstance(path, bytes): return # toImg if hasTorch and isinstance(path, torch.Tensor): path = path.numpy() # toImg if isinstance(path, np.ndarray): # toImg return self.PIL.Image.fromarray(path.astype("uint8")) # toImg if isinstance(path, mpl.figure.Figure): # toImg canvas = path.canvas; canvas.draw() # toImg img = self.PIL.Image.frombytes('RGB', canvas.get_width_height(), canvas.tostring_rgb()) # toImg if self.closeFig: plt.close(path) # toImg return img | cli.aS(cropToContentPIL) # toImg if hasGraphviz and isinstance(path, graphviz.Digraph): # toImg import tempfile; a = tempfile.NamedTemporaryFile() # toImg path.render(, format="jpeg"); # toImg fn = f"{}.jpeg"; im = fn | toImg() # toImg try: os.remove(fn) # toImg except: pass # toImg return im # toImg if hasRdkit and isinstance(path, rdkit.Chem.rdchem.Mol): # toImg sz = settings.chem.imgSize # toImg return self.__ror__(rdkit.Chem.Draw.MolsToGridImage([path], subImgSize=[sz, sz]).data) | (cli.aS(cropToContentPIL) if self.crop else cli.iden()) # toImg if hasPandas and isinstance(path, pd.DataFrame): path = path | cli.toHtml() # toImg if isinstance(path, k1lib.viz.Html): return imgkit.from_string(path, False, options={'format': 'jpg'}) | toImg() # toImg path = path | cli.deref() # toImg if len(path) > 0 and isinstance(path[0], str): # toImg from PIL import ImageDraw # toImg h = path | cli.shape(0); w = path | cli.shape(0).all() | cli.aS(max) # toImg image ="L", ((w+1)*20, (h+1)*60), 255) # toImg font = PIL.ImageFont.truetype(settings.font, 18) if settings.font else None # toImg ImageDraw.Draw(image).text((20, 20), path | cli.join("\n"), 0, font=font) # toImg return np.array(image)/255 | (cli.aS(cropToContentNp) if self.crop else iden()) | cli.op()*255 | toImg() # toImg return NotImplemented # toImg
_nonNpImgTypes = [np.ndarray] # toImg if hasTorch: _nonNpImgTypes.append(torch.Tensor) # toImg if hasPIL: _nonNpImgTypes.append(PIL.Image.Image) # toImg _nonNpImgTypes = tuple(_nonNpImgTypes) # toImg class toNpImg(BaseCli): # toNpImg def __init__(self): # toNpImg """Converts to a numpy array containing the image data""" # toNpImg pass # toNpImg def __ror__(self, it): # toNpImg if hasattr(it, "_toNpImg"): return it._toNpImg() # toNpImg if not isinstance(it, _nonNpImgTypes): it = it | toImg() # toNpImg if hasPIL and isinstance(it, PIL.Image.Image): it = it | toTensor() # toNpImg if hasTorch and isinstance(it, torch.Tensor): it = it.numpy() # toNpImg if isinstance(it, np.ndarray): it = it.astype(np.uint8) # toNpImg return it # toNpImg
[docs]class toRgb(BaseCli): # toRgb blurb="Converts grayscale/rgb PIL image to rgb image" # toRgb
[docs] def __init__(self): # toRgb """Converts greyscale/rgb PIL image to rgb image. Example:: # reads image file and converts it to rgb "a.png" | toImg() | toRgb()""" # toRgb import PIL; self.PIL = PIL # toRgb
def _typehint(self, inp): return inp # toRgb
[docs] def __ror__(self, i): # toRgb if hasattr(i, "_toRgb"): return i._toRgb() # toRgb if i.getbands() == ("R", "G", "B"): return i # toRgb rgbI ="RGB", i.size) # toRgb rgbI.paste(i); return rgbI # toRgb
[docs]class toRgba(BaseCli): # toRgba blurb="Converts random PIL image to rgba image" # toRgba
[docs] def __init__(self): # toRgba """Converts random PIL image to rgba image. Example:: # reads image file and converts it to rgba "a.png" | toImg() | toRgba()""" # toRgba import PIL; self.PIL = PIL # toRgba
def _typehint(self, inp): return inp # toRgba
[docs] def __ror__(self, i): # toRgba if hasattr(i, "_toRgba"): return i._toRgba() # toRgba if i.getbands() == ("R", "G", "B", "A"): return i # toRgba rgbI ="RGBA", i.size) # toRgba rgbI.paste(i); return rgbI # toRgba
[docs]class toGray(BaseCli): # toGray blurb="Converts random PIL image to a grayscale image" # toGray
[docs] def __init__(self): # toGray """Converts random PIL image to a grayscale image. Example:: # reads image file and converts it to rgba "a.png" | toImg() | toGray()""" # toGray import PIL; self.PIL = PIL # toGray
def _typehint(self, inp): return inp # toGray
[docs] def __ror__(self, i): # toGray if hasattr(i, "_toGray"): return i._toGray() # toGray if i.getbands() == ("L"): return i # toGray return self.PIL.ImageOps.grayscale(i) # toGray
[docs]class toDict(BaseCli): # toDict blurb="Converts 2 Iterators, 1 key, 1 value into a dictionary" # toDict
[docs] def __init__(self, rows=True, f=None): # toDict """Converts 2 Iterators, 1 key, 1 value into a dictionary. Example:: # returns {1: 3, 2: 4} [[1, 3], [2, 4]] | toDict() # returns {1: 3, 2: 4} [[1, 2], [3, 4]] | toDict(False) If ``rows`` is a string, then it will build a dictionary from key-value pairs delimited by this character. For example:: ['gene_id "ENSG00000290825.1"', 'transcript_id "ENST00000456328.2"', 'gene_type "lncRNA"', 'gene_name "DDX11L2"', 'transcript_type "lncRNA"', 'transcript_name "DDX11L2-202"', 'level 2', 'transcript_support_level "1"', 'tag "basic"', 'tag "Ensembl_canonical"', 'havana_transcript "OTTHUMT00000362751.1"'] | toDict(" ") That returns:: {'gene_id': '"ENSG00000290825.1"', 'transcript_id': '"ENST00000456328.2"', 'gene_type': '"lncRNA"', 'gene_name': '"DDX11L2"', 'transcript_type': '"lncRNA"', 'transcript_name': '"DDX11L2-202"', 'level': '2', 'transcript_support_level': '"1"', 'tag': '"Ensembl_canonical"', 'havana_transcript': '"OTTHUMT00000362751.1"'} :param rows: if True, reads input in row by row, else reads in list of columns :param f: if specified, return a defaultdict that uses this function as its generator""" # toDict self.rows = rows # toDict if f is not None: self.f = lambda d: defaultdict(f, d) # toDict else: self.f = lambda x: x # toDict
[docs] def __ror__(self, it) -> dict: # toDict r = self.rows; f = self.f # toDict if r: # toDict if isinstance(r, str): return it | cli.apply(cli.aS(lambda x: x.split(" ")) | cli.head(1).split() | cli.item() + cli.join(" ")) | toDict() # toDict return f({_k:_v for _k, _v in it}) # toDict return f({_k:_v for _k, _v in zip(*it)}) # toDict
def _jsF(self, meta): # toDict fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toDict if not self.rows: raise Exception("toDict._jsF() doesn't support .rows=False yet") # toDict return f"{fIdx} = ({dataIdx}) => {dataIdx}.toDict()", fIdx # toDict
def _toop(toOp, c, force, defaultValue): # _toop return cli.apply(toOp, c) | (cli.apply(lambda x: x or defaultValue, c) if force else cli.filt(cli.op() != None, c)) # _toop def _toFloat(e) -> Union[float, None]: # _toFloat try: return float(e) # _toFloat except: return None # _toFloat
[docs]class toFloat(BaseCli): # toFloat blurb="Converts an iterator into a list of floats" # toFloat
[docs] def __init__(self, *columns, mode=2): # toFloat """Converts every row into a float. Example:: # returns [1, 3, -2.3] ["1", "3", "-2.3"] | toFloat() | deref() # returns [[1.0, 'a'], [2.3, 'b'], [8.0, 'c']] [["1", "a"], ["2.3", "b"], [8, "c"]] | toFloat(0) | deref() With weird rows:: # returns [[1.0, 'a'], [8.0, 'c']] [["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0) | deref() # returns [[1.0, 'a'], [0.0, 'b'], [8.0, 'c']] [["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0, force=True) | deref() This also works well with :class:`torch.Tensor` and :class:`numpy.ndarray`, as they will not be broken up into an iterator:: # returns a numpy array, instead of an iterator np.array(range(10)) | toFloat() :param columns: if nothing, then will convert each row. If available, then convert all the specified columns :param mode: different conversion styles - 0: simple ``float()`` function, fastest, but will throw errors if it can't be parsed - 1: if there are errors, then replace it with zero - 2: if there are errors, then eliminate the row""" # toFloat self.columns = columns; self.mode = mode # toFloat
[docs] def __ror__(self, it): # toFloat columns = self.columns; mode = self.mode # toFloat if len(columns) == 0: # toFloat if hasPandas and isinstance(it, pd.core.arraylike.OpsMixin): it = it.to_numpy() # toFloat if isinstance(it, np.ndarray): return it.astype(float) # toFloat if isinstance(it, torch.Tensor): return it.float() # toFloat if mode == 0: return (float(e) for e in it) # toFloat return it | _toop(_toFloat, None, mode == 1, 0.0) # toFloat else: # toFloat if hasPandas and isinstance(it, pd.DataFrame): # toFloat cols = [it[c] for c in list(it)]; nameGen = it.newColName(None) # toFloat for c in columns: cols[c] = cols[c].copy().astype(float) # toFloat return pd.DataFrame({getattr(c, "name", next(nameGen)):c for c in cols}) # toFloat return it | cli.init.serial(*(_toop(_toFloat, c, mode == 1, 0.0) for c in columns)) # toFloat
def _jsF(self, meta): # toFloat fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); cols = self.columns # toFloat if len(cols) == 0: # toFloat if mode == 0: return f"{fIdx} = ({dataIdx}) => {dataIdx}.map((v) => parseFloat(v))", fIdx # toFloat if mode == 1: return f"{fIdx} = ({dataIdx}) => {dataIdx}.map((v) => {{ const a = parseFloat(v); return a === a ? a : 0 }})", fIdx # toFloat if mode == 2: return f"{fIdx} = ({dataIdx}) => {{ const ans = []; for (const v of {dataIdx}) {{ const a = parseFloat(v); if (a === a) ans.push(a); }}; return ans; }}" # toFloat else: return f"""\ {fIdx} = ({dataIdx}) => {{ const ans = []; for (const row of {dataIdx}) {{ {'ans.push(;' if mode == 0 else ''} {'ans.push( => (v === v ? v : 0)));' if mode == 1 else ''} {'const rowp =;if ( => v === v).every((v) => v)) ans.push(rowp);' if mode == 2 else ''} }} return ans; }}""", fIdx # toFloat
def _toInt(e) -> Union[int, None]: # _toInt try: return int(float(e)) # _toInt except: return None # _toInt
[docs]class toInt(BaseCli): # toInt blurb="Converts an iterator into a list of ints" # toInt
[docs] def __init__(self, *columns, mode=2): # toInt """Converts every row into an integer. Example:: # returns [1, 3, -2] ["1", "3", "-2.3"] | toInt() | deref() :param columns: if nothing, then will convert each row. If available, then convert all the specified columns :param mode: different conversion styles - 0: simple ``float()`` function, fastest, but will throw errors if it can't be parsed - 1: if there are errors, then replace it with zero - 2: if there are errors, then eliminate the row See also: :meth:`toFloat`""" # toInt self.columns = columns; self.mode = mode; # toInt
[docs] def __ror__(self, it): # toInt columns = self.columns; mode = self.mode # toInt if len(columns) == 0: # toInt if isinstance(it, np.ndarray): return it.astype(int) # toInt if isinstance(it, torch.Tensor): return # toInt if mode == 0: return (int(e) for e in it) # toInt return it | _toop(_toInt, None, mode == 1, 0) # toInt else: # toInt if hasPandas and isinstance(it, pd.DataFrame): # toInt cols = [it[c] for c in list(it)]; nameGen = it.newColName(None) # toInt for c in columns: cols[c] = cols[c].copy().astype(int) # toInt return pd.DataFrame({getattr(c, "name", next(nameGen)):c for c in cols}) # toInt return it | cli.init.serial(*(_toop(_toInt, c, mode == 1, 0.0) for c in columns)) # toInt
def _jsF(self, meta): # toInt fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); cols = self.columns # toInt if len(cols) == 0: # toInt if mode == 0: return f"{fIdx} = ({dataIdx}) => {dataIdx}.map((v) => parseInt(v))", fIdx # toInt if mode == 1: return f"{fIdx} = ({dataIdx}) => {dataIdx}.map((v) => {{ const a = parseInt(v); return a === a ? a : 0 }})", fIdx # toInt if mode == 2: return f"{fIdx} = ({dataIdx}) => {{ const ans = []; for (const v of {dataIdx}) {{ const a = parseInt(v); if (a === a) ans.push(a); }}; return ans; }}" # toInt else: return f"""\ {fIdx} = ({dataIdx}) => {{ const ans = []; for (const row of {dataIdx}) {{ {'ans.push(;' if mode == 0 else ''} {'ans.push( => (v === v ? v : 0)));' if mode == 1 else ''} {'const rowp =;if ( => v === v).every((v) => v)) ans.push(rowp);' if mode == 2 else ''} }} return ans; }}""", fIdx # toInt
[docs]class toBytes(BaseCli): # toBytes blurb="Converts several object types to bytes" # toBytes
[docs] def __init__(self, dataType=None): # toBytes """Converts several object types to bytes. Example:: # converts string to bytes "abc" | toBytes() # converts image to bytes in jpg format torch.randn(200, 100) | toImg() | toBytes() # converts image to bytes in png format torch.randn(200, 100) | toImg() | toBytes("PNG") "some_file.mp3" | toAudio() | toBytes("mp3") If it doesn't know how to convert to bytes, it will just pickle it .. admonition:: Custom datatype It is possible to build objects that can interoperate with this cli, like this:: class custom1: def __init__(self, config=None): ... def _toBytes(self): return b"abc" class custom2: def __init__(self, config=None): ... def _toBytes(self, dataType): if dataType == "png": return b"123" else: return b"456" custom1() | toBytes() # returns b"abc" custom2() | toBytes() # returns b"456" custom2() | toBytes("png") # returns b"123" When called upon, :class:`toBytes` will detect that the input has the ``_toBytes`` method, which will prompt it to execute that method of the complex object. Of course, this means that you can return anything, not necessarily bytes, but to maintain intuitiveness, you should return either bytes or iterator of bytes :param dataType: depending on input. If it's an image then this can be png, jpg. If it's a sound then this can be mp3, wav or things like that""" # toBytes self.dataType = dataType # toBytes
[docs] def __ror__(self, it): # toBytes if isinstance(it, str): return it.encode() # toBytes if hasPIL: # toBytes if isinstance(it, PIL.Image.Image): # toBytes it = it | toRgb(); buffered = io.BytesIO() # toBytes, format=(self.dataType or "JPEG")); return buffered.getvalue() # toBytes if hasattr(it, "_toBytes"): # toBytes n = len(inspect.getfullargspec(it._toBytes).args[1:]) # toBytes if n == 0: return it._toBytes() # toBytes elif n == 1: return it._toBytes(self.dataType) # toBytes else: raise Exception(f"{it.__class__.__name__} have 2 or more arguments, which is unsupported") # toBytes import dill; return dill.dumps(it) # toBytes
mpld3 = k1lib.dep("mpld3", url="") # toBytes class DataUri: # DataUri def __init__(self, uri:str): # DataUri self.uri = uri # "data:image/png;base64, ..." # DataUri self.mime = uri.split(";")[0].split(":")[-1] # "image/png" # DataUri self.mimeBase = self.mime.split("/")[0] # "image" # DataUri def _repr_html_(self): # DataUri if self.mimeBase == "image": return f"<img src=\"{self.uri}\"/>" # DataUri if self.mime == "text/html": return base64.b64decode(self.uri.split("base64,")[-1]).decode() # DataUri def __repr__(self): # DataUri uri = self.uri # DataUri return f"<DataUri mime='{self.mime}', self.uri='{(uri[:75] + '...') if len(uri) > 75 else uri}'>" # DataUri def _dataUriHtml(it): return DataUri(f"data:text/html;base64, {base64.b64encode(it.encode()).decode()}") # _dataUriHtml
[docs]class toDataUri(BaseCli): # toDataUri blurb="Converts several object types into data uri scheme" # toDataUri
[docs] def __init__(self): # toDataUri """Converts incoming object into data uri scheme. Data uris are the things that look like "data:image/png;base64, ...", or "data:text/html;base64, ...". This is a convenience tool mainly for other tools, and not quite useful directly. Example:: randomImg = cat("", False) | toImg() # returns PIL image randomImg | toDataUri() # returns k1lib.cli.conv.DataUri object with .mime field "image/png" and .uri field "data:image/png;base64, ..." randomImg | toDataUri() | toHtml() # returns hmtl string `<img src="data:image/png;base64, ..."/>` randomImg | toHtml() # same like above. toHtml() actually calls toDataUri() behind the scenes randomImg | toDataUri() | toAnchor() # creates anchor tag (aka link elements "<a></a>") that, when clicked, displays the image in a new tab randomImg | toAnchor() # same as above. toAnchor() actually calls toDataUri() behind the scenes """ # toDataUri self.throw = False # can be configured by outside clis, like toHtml() # toDataUri
[docs] def __ror__(self, it): # toDataUri if isinstance(it, str): return _dataUriHtml(it) # toDataUri if isinstance(it, DataUri): return it # toDataUri if hasPIL and isinstance(it, PIL.Image.Image): # toDataUri it = it | toBytes(dataType="PNG") | cli.aS(base64.b64encode) | cli.op().decode() # toDataUri return DataUri(f"data:image/png;base64, {it}") # toDataUri try: return DataUri(it._toDataUri()) # toDataUri except Exception as e: # toDataUri if self.throw: raise Exception(f"toDataUri() called on an unfamiliar object, and the object doesn't implement _toDataUri(). Error: {e}") # toDataUri return _dataUriHtml(it | toHtml()) # toDataUri
[docs]class toAnchor(BaseCli): # toAnchor blurb="Converts several object types into a html anchor tag" # toAnchor
[docs] def __init__(self, text:str="click here"): # toAnchor """Converts incoming object into a html anchor tag that, when clicked, displays the incoming object's html in another tab. Example:: randomImg = cat("", False) | toImg() # returns PIL image randomImg | toAnchor() # returns html string `<a href="data:image/png;base64, ..."></a>` On some browsers, there's sort of a weird bug where a new tab would open, but there's nothing displayed on that tab. If you see this is happening, just press F5 or Ctrl+R to refresh the page and it should display everything nicely :param text: text to display inside of the anchor""" # toAnchor self.text = text # toAnchor
[docs] def __ror__(self, it:str): # toAnchor s = it | toDataUri() | cli.op().uri # toAnchor return k1lib.viz.Html(f"<a href=\"{s}\" target=\"_blank\">{self.text}</a>") # toAnchor
[docs]class toHtml(BaseCli): # toHtml blurb="Converts several object types to html" # toHtml
[docs] def __init__(self): # toHtml """Converts several object types to html. Example:: # converts PIL image to html <img> tag torch.randn(200, 100) | toImg() | toHtml() # converts graphviz graph to svg text (which is essentially html) g = k1.digraph(); g(*"abc"); g(*"bcd"); g | toHtml() # converts plotly graphs to html import as px; import pandas as pd df = pd.DataFrame({'x': [1, 2, 3, 4, 5], 'y': [10, 11, 12, 14, 15]}) fig = px.line(df, x='x', y='y', title='Simple Line Chart') fig | toHtml() # converts matplotlib plot to image, and then to html. Do this if you want a static plot x = np.linspace(-2, 2); y = x**2 plt.plot(x, x**2); plt.gcf() | toImg() | toHtml() # converts matplotlib plot to D3.js html sketch plt.plot(x, x**2); plt.gcf() | toHtml() """ # toHtml pass # toHtml
[docs] def __ror__(self, it): return k1lib.viz.Html(self._ror_(it)) # toHtml
def _ror_(self, it): # toHtml if isinstance(it, str): return it # toHtml if hasPlotly and isinstance(it, plotly.graph_objs._figure.Figure): # toHtml out = io.StringIO(); it.write_html(out);; return # toHtml if isinstance(it, mpl.figure.Figure): res = mpld3.fig_to_html(it); plt.close(it); return res # toHtml if hasGraphviz and isinstance(it, graphviz.Digraph): # toHtml import tempfile; a = tempfile.NamedTemporaryFile() # toHtml it.render(, format="svg"); # toHtml fn = f"{}.svg"; im = | cli.join("") # toHtml try: os.remove(fn) # toHtml except: pass # toHtml return Svg(im) # toHtml try: # toHtml res = it._repr_html_() # toHtml if res: return res # toHtml except: pass # toHtml try: # toHtml res = it._toHtml() # toHtml if res: return res # toHtml except: pass # toHtml try: # toHtml f = toDataUri(); f.throw = True # toHtml res = (it | f)._repr_html_() # toHtml if res: return res # toHtml except: pass # toHtml return it.__repr__() # toHtml def _jsF(self, meta): return "", "" # toHtml
try: # toHtml from rdkit import Chem # toHtml from rdkit.Chem import Draw # toHtml from rdkit.Chem import AllChem # toHtml from rdkit.Chem.Draw import IPythonConsole # toHtml IPythonConsole.drawOptions.addAtomIndices = True # toHtml __all__ = [*__all__, "toMol", "toSmiles"] # toHtml def toMol(): # toHtml """Smiles to molecule. Example:: "c1ccc(C)cc1" | toMol()""" # toHtml return cli.aS(Chem.MolFromSmiles) # toHtml def toSmiles(): # toHtml """Molecule to smiles. Example:: "c1ccc(C)cc1" | toMol() | toSmiles()""" # toHtml return cli.aS(Chem.MolToSmiles) # toHtml except: pass # toHtml import unicodedata, hashlib # toHtml
[docs]def toAscii(): # toAscii """Converts complex unicode text to its base ascii form. Example:: "hà nội" | toAscii() # returns "ha noi" Taken from""" # toAscii return cli.aS(lambda word: unicodedata.normalize('NFKD', word).encode('ascii', 'ignore')) # toAscii
[docs]def toHash() -> str: # toHash """Converts some string into some hash string. Example:: "abc" | toHash() # returns 'gASVJAAAAAAAAABDILp4Fr+PAc/qQUFA3l2uIiOwA2Gjlhd6nLQQ/2HyABWtlC4=' Why not just use the builtin function ``hash("abc")``? Because it generates different hashes for different interpreter sessions, and that breaks many of my applications that need the hash value to stay constant forever.""" # toHash def hashF(msg:str) -> str: m = hashlib.sha256(); m.update(f"{msg}".encode()); return k1lib.encode(m.digest()) # toHash return cli.aS(hashF) # toHash
import csv # toHash settings.add("toCsv", k1lib.Settings().add("df", False, "if False, use csv.reader (incrementally), else use pd.read_csv (all at once, might be huge!)"), "conv.toCsv() settings") # toHash
[docs]class toCsv(BaseCli): # toCsv blurb="Converts several object types into a table/dataframe" # toCsv
[docs] def __init__(self, allSheets=False): # toCsv """Converts a csv file name into a table. Example:: "abc.csv" | toCsv() # returns table of values (Iterator[List[str]]) "abc.csv" | toCsv() # returns pd.DataFrame, if configure 'settings.toCsv.df = True' "def.xlsx" | toCsv() # returns table of values in the first sheet "def.xlsx" | toCsv(True) # returns List[Sheet name (str), table of values] ["a,b,c,d", "1,2,3,4"] | toCsv() | deref() # returns [['a', 'b', 'c', 'd'], ['1', '2', '3', '4']] .. warning:: Note that this is pretty slow compared to just splitting by semicolons. If your dataset doesn't have anything complicated like semicolons in quotes, then just do ``op().split(",").all()`` If your dataset does have complicated quotes, then I'd suggest reading the csv using this cli, then convert it to a tsv file (tab-separated value). Then you can always just split the string using tab characters :param allSheets: if input is an Excel sheet, whether to read in all sheets or just the first sheet. No effect if input is a normal csv file""" # toCsv self.allSheets = allSheets # toCsv
[docs] def __ror__(self, fn:"str | Iterator[str|bytes]"): # toCsv if isinstance(fn, str): # toCsv fn = os.path.expanduser(fn) # toCsv if fn.endswith(".xls") or fn.endswith(".xlsx"): # toCsv if self.allSheets: return pd.read_excel(fn, sheet_name=None).items() # toCsv else: return pd.read_excel(fn) # toCsv if settings.toCsv.df: return pd.read_csv(fn) # toCsv def gen(): # toCsv with open(fn) as f: yield from csv.reader(f) # toCsv return gen() # toCsv else: return csv.reader(fn) # toCsv
[docs]class toYaml(BaseCli): # toYaml blurb="Converts file name/yaml string to object and object to yaml string" # toYaml
[docs] def __init__(self, mode=None, safe=True): # toYaml """Converts file name/yaml string to object and object to yaml string. Example:: "some_file.yaml" | toYaml() # returns python object cat("some_file.yaml") | join("\n") | toYaml(1) # returns python object {"some": "object", "arr": [1, 2]} | toYaml() # returns yaml string. Detected object coming in, instead of string, so will convert object into yaml string :param mode: None (default) for figure it out automatically, 0 for loading from file name, 1 for loading from raw yaml string, 2 for converting object to yaml string :param safe: if True, always use safe_load() instead of load()""" # toYaml self.mode = mode; = safe # toYaml
[docs] def __ror__(self, it): # toYaml mode = self.mode; safe = # toYaml if not isinstance(it, str) or mode == 2: return yaml.dump(it) # toYaml if mode == 0 or mode is None: # toYaml with open(it) as f: # toYaml try: return yaml.safe_load(f) if safe else yaml.load(f, yaml.FullLoader) # toYaml except: return yaml.load(f) # for previous versions # toYaml else: # toYaml try: return yaml.safe_load(it) if safe else yaml.load(it, yaml.FullLoader) # toYaml except: return yaml.load(it) # toYaml
import validators, shutil, html, io, os; pydub = k1lib.dep("pydub", url="") # toYaml
[docs]class Audio: # Audio def __init__(self, raw:"pydub.audio_segment.AudioSegment"): self.raw = raw # Audio
[docs] def resample(self, rate) -> "Audio": # Audio """Resamples the audio""" # Audio if rate: # Audio self.raw = self.raw.set_frame_rate(rate) # Audio = np.array(self.raw.get_array_of_samples())/2.15e9 # Audio self.rate = self.raw.frame_rate # Audio return self # Audio
def _toBytes(self, dataType) -> bytes: f = io.BytesIO(); self.raw.export(f, format=(dataType or "wav")); return # Audio def __repr__(self): return f"<Audio duration={k1lib.fmt.time(self.raw.duration_seconds)} rate={self.raw.frame_rate}>" # Audio def __len__(self): return int(self.raw.frame_count()) # Audio def __getitem__(self, slice_): # Audio if not isinstance(slice_, slice): return None # Audio data = np.array(self.raw.get_array_of_samples()) | cli.batched(self.raw.channels) | cli.op()[slice_] # Audio return Audio(pydub.AudioSegment(data.tobytes(), frame_rate=self.raw.frame_rate, sample_width=self.raw.sample_width, channels=self.raw.channels)) # Audio def _repr_html_(self): # plays a short sample, first 10s or sth like that # Audio return f"{html.escape(self.__repr__())}<br>{self.raw[:10000]._repr_html_()}" # Audio
[docs]class toAudio(BaseCli): # toAudio blurb="Reads audio from either a file or a URL or from bytes" # toAudio
[docs] def __init__(self, rate=None): # toAudio """Reads audio from either a file or a URL or from bytes directly. Example:: au = "some_file.wav" | toAudio() # can display in a notebook, which will preview the first 10 second au | toBytes() # exports audio as .wav file au | toBytes("mp3") # exports audio as .mp3 file au.resample(16000) # resamples audio to new rate au | head(0.1) # returns new Audio that has the first 10% of the audio only au | splitW(8, 2) # splits Audio into 2 Audios, first one covering 80% and second one covering 20% of the track au.raw # internal pydub.AudioSegment object. If displayed in a notebook, will play the whole thing You can also use this on any Youtube video or random mp3 links online and on raw bytes:: "" | toAudio() # grab Bad Apple song from internet cat("some_file.wav", False) | toAudio() # grab from raw bytes of mp3 or wav, etc. """ # toAudio self.rate = rate # toAudio
[docs] def __ror__(self, it:"str|bytes") -> Audio: # toAudio if isinstance(it, str): # toAudio if os.path.exists(os.path.expanduser(it)): fn = os.path.expanduser(it); tmp = False # toAudio elif validators.url(it): # toAudio if not shutil.which("yt-dlp"): raise Exception(f"'{it}' looks like a link, but the required 'yt-dlp' binary is not found. Please install it by doing `pip install yt-dlp`") # toAudio fn = None | cli.cmd(f"yt-dlp -o - -x {it}", mode=0, text=False) | cli.item() | cli.file(); tmp = True # toAudio else: raise Exception(f"The file '{it}' does not exist, and it doesn't look like a URL") # toAudio elif isinstance(it, bytes): fn = it | cli.file(); tmp = True # toAudio else: raise Exception(f"Unknown {type(it)} audio type") # toAudio res = Audio(pydub.AudioSegment.from_file(fn)).resample(self.rate) # toAudio if tmp: os.remove(fn) # toAudio return res # toAudio
import datetime; from datetime import datetime as dt # toAudio dateutil = k1lib.dep("dateutil", url="") # toAudio
[docs]class toUnix(BaseCli): # toUnix blurb="Converts to unix timestamp" # toUnix
[docs] def __init__(self, tz:"str |"=None, mode:int=0): # toUnix """Tries anything piped in into a unix timestamp. If can't convert then return None or the current timestamp (depending on mode). Example:: Local time zone independent:: "2023" | toUnix() # returns 2023, or 2023 seconds after unix epoch. Might be undesirable, but has to support raw ints/floats "2023-11-01T00Z" | toUnix() # midnight Nov 1st 2023 GMT "2023-11-01T00:00:00-04:00" | toUnix() # midnight Nov 1st 2023 EST "2023-11-01" | toUnix("US/Pacific") # midnight Nov 1st 2023 PST "2023-11-01" | toUnix("UTC") # midnight Nov 1st 2023 UTC Local time zone dependent (assumes EST):: "2023-11" | toUnix() # if today's Nov 2nd EST, then this would be 1698897600, or midnight Nov 2nd 2023 EST "2023-11-04" | toUnix() # midnight Nov 4th 2023 EST Feel free to experiment more, but in general, this is pretty versatile in what it can convert. With more effort, I'd probably make this so that every example given will not depend on local time, but since I just use this to calculate time differences, I don't really care. :param tz: Timezone, like "US/Eastern", "US/Pacific". If not specified, then assumes local timezone. Get all available timezones by executing ``toUnix.tzs()`` :param mode: if 0, then returns None if can't convert, to catch errors quickly. If 1, then returns current timestamp instead""" # toUnix self.mode = mode # toUnix if isinstance(tz, = tz # toUnix else: # toUnix = # toUnix if is None and tz: raise Exception(f"Timezone '{tz}' not found. You can get a list of all available timezones at `toUnix.tzs()`") # toUnix
[docs] @staticmethod # toUnix def tzs(): return list(dateutil.zoneinfo.get_zonefile_instance().zones.keys()) # toUnix
[docs] def __ror__(self, t): # toUnix if isinstance(t, datetime.datetime): return t.timestamp() # toUnix if hasattr(t, "dtype") and isinstance(t.dtype, np.dtypes.DateTime64DType): return t.astype(int) # toUnix try: return float(t) # toUnix except: # toUnix try: # toUnix a = dateutil.parser.parse(t) # toUnix if a = a.replace( # toUnix return a.timestamp() # toUnix except: return time.time() if self.mode else None # toUnix
[docs]class toIso(BaseCli): # toIso blurb="Converts unix timestamp to a human readable time string" # toIso
[docs] def __init__(self, tz:"str |"=None): # toIso """Converts unix timestamp into ISO 8601 string format. Example:: 1701382420 | toIso() # returns '2023-11-30T17:13:40', which is correct in EST time 1701382420 | toIso() | toUnix() # returns 1701382420, the input timestamp, showing it's correct 1701382420.123456789 | toIso() # still returns '2023-11-30T17:13:40' As you might have noticed, this cli depends on the timezone of the host computer. If you want to get it in a different timezone, do this:: 1701382420 | toIso("UTC") # returns '2023-11-30T22:13:40' 1701382420 | toIso("US/Pacific") # returns '2023-11-30T14:13:40' :param tz: Timezone, like "US/Eastern", "US/Pacific". If not specified, then assumes local timezone. Get all available timezones by executing ``toUnix.tzs()``""" # toIso if isinstance(tz, = tz # toIso else: # toIso = # toIso if is None and tz: raise Exception(f"Timezone '{tz}' not found. You can get a list of all available timezones at `toUnix.tzs()`") # toIso
[docs] def __ror__(self, it): # toIso d = dt.fromtimestamp(it) # toIso if return d.astimezone('%Y-%m-%dT%H:%M:%S') # toIso else: return d.strftime('%Y-%m-%dT%H:%M:%S') # toIso
[docs]class toYMD(BaseCli): # toYMD blurb="Converts unix timestamp into tuple (year, month, day, hour, minute, second)" # toYMD
[docs] def __init__(self, idx=None, mode=int): # toYMD """Converts unix timestamp into tuple (year, month, day, hour, minute, second). Example:: 1701382420 | toYMD() # returns [2023, 11, 30, 17, 13, 40] in EST timezone 1701382420 | toYMD(0) # returns 2023 1701382420 | toYMD(1) # returns 11 1701382395 | toYMD(mode=str) # returns ['2023', '11', '30', '17', '13', '15'] :param idx: if specified, take the desired element only. If 0, then take year, 1, then month, etc. :param mode: either int or str. If str, then returns nicely adjusted numbers""" # toYMD self.idx = idx; self.mode = mode # toYMD
[docs] def __ror__(self, it): # toYMD d = dt.fromtimestamp(it) # toYMD if self.mode == int: res = [d.year, d.month,, d.hour, d.minute, d.second] # toYMD else: res = [f"{d.year}", f"{d.month}".rjust(2,"0"), f"{}".rjust(2,"0"), # toYMD f"{d.hour}".rjust(2,"0"), f"{d.minute}".rjust(2,"0"), f"{d.second}".rjust(2,"0")] # toYMD return res if self.idx is None else res[self.idx] # toYMD
settings.add("toLinks", k1lib.Settings()\ .add("splitChars", ["<br>", "<div ", *"\n\t<> ,;"], "characters/strings to split the lines by, so that each link has the opportunity to be on a separate line, so that the first instance in a line don't overshadow everything after it")\ .add("protocols", ["http", "https", "ftp"], "list of recognized protocols to search for links, like 'http' and so on"), "conv.toLinks() settings"); # toYMD
[docs]class toMovingAvg(BaseCli): # toMovingAvg blurb="Smoothes out sequential data using some momentum and debias values" # toMovingAvg
[docs] def __init__(self, col:int=None, alpha=0.9, debias=True, v:float=0, dt:float=1): # toMovingAvg """Smoothes out sequential data using momentum. Example:: # returns [4.8, 4.62, 4.458]. 4.8 because 0.9*5 + 0.1*3 = 4.8, and so on [3, 3, 3] | toMovingAvg(v=5, debias=False) | deref() Sometimes you want to ignore the initial value, then you can turn on debias mode:: x = np.linspace(0, 10, 100); y = np.cos(x) plt.plot(x, y) plt.plot(x, y | toMovingAvg(debias=False) | deref()) plt.plot(x, y | toMovingAvg(debias=False, alpha=0.95) | deref()) plt.plot(x, y | toMovingAvg(debias=True) | deref()) plt.plot(x, y | toMovingAvg(debias=True, alpha=0.95) | deref()) plt.legend(["Signal", "Normal - 0.9 alpha", "Normal - 0.95 alpha", "Debiased - 0.9 alpha", "Debiased - 0.95 alpha"], framealpha=0.3) plt.grid(True) .. image:: ../images/movingAvg.png As you can see, normal mode still has the influence of the initial value at 0 and can't rise up fast, whereas the debias mode will ignore the initial value and immediately snaps to the first value. Also, the 2 graphs with 0.9 alpha snap together quicker than the 2 graphs with 0.95 alpha. Here's the effect of several alpha values: .. image:: ../images/movingAvg-alphas.png :param col: column to apply moving average to :param alpha: momentum term :param debias: whether to turn on debias mode or not :param v: initial value, doesn't matter in debias mode :param dt: pretty much never used, hard to describe, belongs to debias mode, checkout source code for details""" # toMovingAvg self.col = col; self.initV = v; self.alpha = alpha; self.debias = debias; self.dt = dt # toMovingAvg if debias and v != 0: raise Exception("Debias mode activated! This means that the initial value doesn't matter, yet you've specified one") # toMovingAvg if alpha > 1 or alpha < 0: raise Exception("Alpha is outside the [0, 1] range. which does not make sense") # toMovingAvg
[docs] def __ror__(self, it): # toMovingAvg col = self.col # toMovingAvg if hasPandas and isinstance(it, pd.DataFrame): # toMovingAvg if col is None: raise Exception("toMovingAvg(col=None) applied on a DataFrame doesn't make much sense, does it?") # toMovingAvg return it.replaceCol(list(it)[col], list(it[list(it)[col]] | toMovingAvg(None, self.alpha, self.debias, self.initV, self.dt))) # toMovingAvg def gen(): # toMovingAvg m = value = self.initV; alpha = self.alpha; # toMovingAvg if self.debias: # toMovingAvg dt = self.dt; t = 1; tooSmall = False # toMovingAvg if col is None: # toMovingAvg for v in it: # toMovingAvg m = m * alpha + v * (1 - alpha) # toMovingAvg if tooSmall: yield m # skips complex exponential calculation once it's small enough to speed things up # toMovingAvg else: # toMovingAvg exp = alpha**t; value = m / (1 - exp) # toMovingAvg tooSmall = 10*exp < (1-alpha); t += dt; yield value # toMovingAvg else: # toMovingAvg for row in it: # toMovingAvg m = m * alpha + row[col] * (1 - alpha) # toMovingAvg if tooSmall: yield [*row[:col], m, *row[col+1:]] # toMovingAvg else: # toMovingAvg exp = alpha**t; value = m / (1 - exp) # toMovingAvg tooSmall = 10**exp < (1-alpha); t += dt; yield [*row[:col], value, *row[col+1:]] # toMovingAvg else: # toMovingAvg if col is None: # toMovingAvg for v in it: m = m * alpha + v * (1 - alpha); yield m # toMovingAvg else: # toMovingAvg for row in it: # toMovingAvg m = m * alpha + row[col] * (1 - alpha) # toMovingAvg yield [*row[:col], m, *row[col+1:]] # toMovingAvg return gen() # toMovingAvg
[docs]class toCm(BaseCli): # toCm blurb="Converts the specified column to a bunch of color values, and adds a matplotlib colorbar automatically" # toCm
[docs] def __init__(self, col:int, cmap=None, title:str=None): # toCm """Converts the specified column to a bunch of color values, and adds a matplotlib colorbar automatically. "cm" = "color map". Example:: import as cm exps = [1, 2, 3, 4, 5] x = np.linspace(-2, 2) data = exps | apply(lambda exp: [exp, x, x**exp]) | deref() # without toCm(), plots fine, demonstrates underlying mechanism, but doesn't allow plotting a separate colorbar data | normalize(0, mode=1) | apply(cm.viridis, 0) | ~apply(lambda c,x,y: plt.plot(x, y, color=c)) | ignore() # with toCm(), draws a colorbar automatically data | toCm(0, cm.viridis, "Exponential") | ~apply(lambda c,x,y: plt.plot(x, y, color=c)) | ignore() .. image:: ../images/toCm.png Functionality is kind of niche, but I need this over and over again, so have to make it :param col: column to convert float/int to color (tuple of 4 floats) :param cmap: colormap to use. If not specified, defaults to ``cm.viridis`` :param title: title of the colorbar, optional""" # toCm self.col = col; self.cmap = cmap or cm.viridis; self.title = title # toCm
[docs] def __ror__(self, it): # toCm col = self.col; cmap = self.cmap; title = self.title; it = init.dfGuard(it) # toCm if col is None: # toCm if not isinstance(it, k1lib.settings.cli.arrayTypes): it = list(it) # toCm plt.colorbar(cm.ScalarMappable(norm=plt.Normalize(*it | cli.toMin() & cli.toMax()), cmap=cmap), ax=plt.gca(), label=title) # toCm return it | cli.normalize(None, 1) | cli.apply(cmap) # toCm else: # toCm it = it | cli.deref(2) # toCm plt.colorbar(cm.ScalarMappable(norm=plt.Normalize(*it | cli.cut(col) | cli.toMin() & cli.toMax()), cmap=cmap), ax=plt.gca(), label=title) # toCm return it | cli.normalize(col, 1) | cli.apply(cmap, col) # toCm
PyPDF2 = k1lib.dep("PyPDF2", url="") # toCm
[docs]class Pdf: # Pdf def __init__(self, fn): # Pdf self.fn = fn; self._handle = None; self._open() # Pdf def _open(self): # Pdf if self._handle is not None: return # Pdf self._handle = open(self.fn, 'rb') # Pdf self._reader = PyPDF2.PdfReader(self._handle) # Pdf self._npages = len(self._reader.pages); # Pdf def __iter__(self): return (PdfPage(self, i) for i in range(len(self))) # Pdf def __getitem__(self, s): # Pdf if isinstance(s, slice): return [PdfPage(self, i) for i in range(len(self))[s]] # Pdf else: return PdfPage(self, s) # Pdf def __getstate__(self): d = dict(self.__dict__); d["_handle"] = None; d["_reader"] = None; return d # Pdf def __setstate__(self, d): self.__dict__.update(d) # Pdf def __repr__(self): return f"<Pdf #pages={len(self)} '{self.fn}'>" # Pdf def __del__(self): # Pdf if self._handle: self._handle.close() # Pdf def __len__(self): return self._npages # Pdf
class PdfPage: # PdfPage def __init__(self, pdf:Pdf, i:int): # PdfPage self.pdf = pdf; self.i = i # PdfPage def __repr__(self): return f"<PdfPage page={self.i} #pages={len(self.pdf)} fn='{self.pdf.fn}'>" # PdfPage def _cat(self): # PdfPage self.pdf._open() # PdfPage with open(self.pdf.fn, 'rb') as o: # PdfPage startTime = time.time() # PdfPage page = self.pdf._reader.pages[self.i] # PdfPage return page.extract_text().split("\n") # PdfPage def _toImg(self, **kwargs): # PdfPage k1lib.depCli("pdftoppm"); fn2 = b"" | cli.file(); fn = self.pdf.fn.replace("'", "\\'"); i = self.i # PdfPage None | cli.cmd(f"pdftoppm -f {i+1} -l {i+1} -jpeg '{fn}' {fn2} -singlefile") | cli.deref() # PdfPage im = f"{fn2}.jpg" | cli.toImg(); os.remove(f"{fn2}.jpg"); os.remove(fn2); return im # PdfPage _pdf_initialized = [False] # PdfPage def _pdf_init(): # _pdf_init if _pdf_initialized[0]: return # _pdf_init _pdf_initialized[0] = True; k1lib.cli.init.addAtomic(Pdf); k1lib.cli.init.addAtomic(PdfPage) # _pdf_init
[docs]class toPdf(BaseCli): # toPdf blurb="Reads a pdf file to a managed object and can do lots of downstream tasks from there" # toPdf
[docs] def __init__(self): # toPdf """Reads a pdf file. Can do lots of downstream tasks. Example:: pdf = "someFile.pdf" | toPdf() len(pdf) # get number of pages pdf[2] | cat() # get text content of 2nd (0-indexed) page pdf[2] | toImg() # converts 2nd page to an image """ # toPdf _pdf_init() # toPdf
[docs] def __ror__(self, it) -> Pdf: return Pdf(it) # toPdf
[docs]class toDist(BaseCli): # toDist blurb="Calculates the euclidean distance of the input points" # toDist
[docs] def __init__(self, norm=2): # toDist """Calculates the euclidean distance of the input points. Example:: a = np.random.randn(3) b = np.random.randn(3) [a, b] | toDist() # returns distance between those 2 points Essentially just ((a-b)**2).sum()**0.5. But I kept needing this over and over again so gotta make it into a separate cli.""" # toDist self.norm = norm # toDist
[docs] def __ror__(self, it): a,b = it; return ((a-b)**self.norm).sum()**(1/self.norm) # toDist
[docs]class toAngle(BaseCli): # toAngle blurb="Calculates the angle between 2 vectors" # toAngle
[docs] def __init__(self, radians=True): # toAngle """Calculates the angle between 2 vectors. Example:: a = np.random.randn(3) b = np.random.randn(3) [a, b] | toAngle() # returns angle between those 2 vectors """ # toAngle self.radians = radians; self.mult = 1 if radians else 180/math.pi # toAngle
[docs] def __ror__(self, it): # toAngle a,b = it; la = (a**2).sum()**0.5; lb = (b**2).sum()**0.5 # toAngle return math.acos(a@b/la/lb)*self.mult # toAngle
[docs]class idxsToNdArray(BaseCli): # idxsToNdArray blurb="Converts indices (aka point cloud) to numpy array" # idxsToNdArray
[docs] def __init__(self, ds:"tuple[int]"=None, n:int=None): # idxsToNdArray """Converts indices (aka point cloud) to numpy array. Example:: [[1,2], [2,3]] | idxsToNdArray() # returns np.array([[0, 0, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]]) [[1,2], [2,3]] | idxsToNdArray(n=2) # returns np.array([[0, 0, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]]) [[1,2], [2,3]] | idxsToNdArray(ds=[3,4]) # returns np.array([[0, 0, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]]) So, the standard use case is that you have a point cloud (points [1,2] and [2,3]) and you want to get the dense array with those points filled in. Then you can do it with this function. Notice how in all 3 examples, the points are marked with a 1. You can specify either the dense array's shape using parameter ".ds", or just the number of dimensions with parameter ".n". If you specify neither then it will auto figure that out, but the final shape might not be what you wanted. Let's see some other use cases: [[1,2,3], [2,3,4]] | idxsToNdArray() | shape() # returns (3, 4, 5) [[1,2,3], [2,3,4]] | idxsToNdArray(n=2) # returns np.array([[0, 0, 0, 0], [0, 0, 3, 0], [0, 0, 0, 4]]) [[1,2,3], [2,3,4]] | idxsToNdArray(n=1) # returns np.array([[0, 0], [2, 3], [3, 4]]) [[1,2,3,4], [2,3,4,5]] | idxsToNdArray(n=2) # returns np.array([[[0, 0], [0, 0], [0, 0], [0, 0]], [[0, 0], [0, 0], [3, 4], [0, 0]], [[0, 0], [0, 0], [0, 0], [4, 5]]]) In the first example, if you don't specify the dimensions, it will return a 3d array, and the selected points will have the value 1. But if you insist that it should have 2 dimensions only, and the remaining columns should be the selected points' values, then you can either limit .n, or specify the shape .ds but only has length of 2. Notice how the second example got filled in by values 3 and 4 and not 1. :param ds: dimensions :param n: number of dimensions""" # idxsToNdArray self.ds = ds; self.n = n # idxsToNdArray if ds is not None and n is not None and len(ds) != n: raise Exception("Can specify either .ds or .n only. .n will be inferred from .ds") # idxsToNdArray
[docs] def __ror__(self, it): # idxsToNdArray n = self.n; ds = self.ds; arr = None # idxsToNdArray it = init.dfGuard(it) | cli.deref(2) | cli.toNdArray() # idxsToNdArray if len(it.shape) != 2: raise Exception("Input have to be a 2d array") # idxsToNdArray if n is None and ds is None: n = len(it[0]); F = 1; ds = it.T | cli.toMax().all() | cli.op().astype(int)+1 # idxsToNdArray elif n is not None: ds = it[:,:n].T | cli.toMax().all() | cli.op().astype(int)+1; F = len(it[0]) - n # idxsToNdArray elif ds is not None: n = len(ds); F = len(it[0]) - n # idxsToNdArray if len(it[0]) == n: it = np.hstack([it, np.ones(it.shape[0])[:,None]]) # idxsToNdArray sel = tuple(it[:,:n].T.astype(int).tolist()) # idxsToNdArray if F > 1: arr = np.zeros((*ds,F)); arr[sel] = it[:,n:] # idxsToNdArray else: arr = np.zeros(ds); arr[sel] = it[:,n] # idxsToNdArray return arr # idxsToNdArray
_toFileType_tmpFile = [None] # idxsToNdArray
[docs]class toFileType(BaseCli): # toFileType blurb="Grab file type of a file or file contents (bytes)" # toFileType
[docs] def __init__(self): # toFileType """Grab file type of a file or file contents. Example:: # returns "PNG image data, 1024 x 1365, 8-bit/color RGBA, non-interlaced" "some_image.png" | toFileType() # returns "JPEG image data, JFIF standard 1.01, aspect ratio, density 1x1, segment length 16, baseline, precision 8, 1024x1365, components 3" "some_image.png" | toImg() | toBytes() | toFileType() This does take quite a while to execute, up to 42ms/file, so if you're doing it a lot, would suggest you use :class:`~k1lib.cli.modifier.applyMp` or something like that. Internally, this will call the command line program ``file`` and returns its results, so this is just a convenience cli.""" # toFileType if _toFileType_tmpFile[0] is None: _toFileType_tmpFile[0] = b"" | cli.file() # toFileType self.autoInc = k1lib.AutoIncrement() # toFileType
[docs] def __ror__(self, it): # toFileType if isinstance(it, str): # toFileType fn = os.path.expanduser(it); it = fn.replace("'", """'"'"'""") # toFileType res = None | cli.cmd(f"file '{it}'") | cli.item() # toFileType elif isinstance(it, bytes): # toFileType fn = f"{_toFileType_tmpFile[0]}_{os.getpid()}_{self.autoInc()}" # toFileType it | cli.file(fn); res = None | cli.cmd(f"file {fn}") | cli.item(); os.remove(fn) # toFileType else: raise Exception("toFileType() only accepts either path (string) or file content (bytes)") # toFileType return res.replace(f"{fn}: ", "") # toFileType