Source code for k1lib._monkey

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import k1lib, math, numpy as np, time, signal, os, warnings, traceback, json
from k1lib import cli
from typing import List, Tuple, ContextManager
from contextlib import contextmanager
try: import torch; from torch import nn; hasTorch = True
except: hasTorch = False
try: import matplotlib.animation
except: pass
try: import matplotlib.pyplot as plt
except: pass
__all__ = ["dummy"]
[docs]def dummy(): # dummy """Does nothing. Only here so that you can read source code of this file and see what’s up.""" # dummy pass # dummy
settings = k1lib.settings.add("monkey", k1lib.Settings(), "monkey-patched settings").monkey # dummy k1lib._settings.add("monkey", k1lib.Settings(), "monkey-patched settings") # dummy try: # dummy import forbiddenfruit # dummy def hasNan(self): # dummy """Checks whether numpy array has nan or not""" # dummy return np.isnan(self).any() # dummy forbiddenfruit.curse(np.ndarray, "hasNan", hasNan) # dummy def clearNan(self, value:float=0.0) -> np.ndarray: # dummy """Sets all nan values to a specified value. Example:: a = np.random.randn(3, 3) * float("nan") a.clearNan() # now full of zeros""" # dummy self[self != self] = value; return self # dummy forbiddenfruit.curse(np.ndarray, "clearNan", clearNan) # dummy except: pass # dummy if hasTorch: # dummy @k1lib.patch(nn.Module) # dummy def importParams(self:nn.Module, params:List[nn.Parameter]): # dummy """Given a list of :class:`torch.nn.parameter.Parameter`/:class:`torch.Tensor`, update the current :class:`torch.nn.Module`'s parameters with it'""" # dummy for oldParam, newParam in zip(self.parameters(), params): # dummy oldParam.data = newParam.data.clone() # dummy @k1lib.patch(nn.Module) # dummy def exportParams(self:nn.Module) -> List[torch.Tensor]: # dummy """Gets the list of :class:`torch.Tensor` data""" # dummy return [param.data.clone() for param in self.parameters()] # dummy class ParamsContext: # dummy def __init__(self, m:nn.Module): self.m = m # dummy def __enter__(self): self.params = self.m.exportParams(); return self.params # dummy def __exit__(self, *ignored): self.m.importParams(self.params) # dummy if hasTorch: # dummy @k1lib.patch(nn.Module) # dummy @contextmanager # dummy def paramsContext(self:nn.Module): # dummy """A nice context manager for :meth:`importParams` and :meth:`exportParams`. Returns the old parameters on enter context. Example:: m = nn.Linear(2, 3) with m.paramsContext() as oldParam: pass # go wild, train, mutate `m` however much you like # m automatically snaps back to the old param Small reminder that this is not foolproof, as there are some :class:`~torch.nn.Module` that stores extra information not accessible from the model itself, like :class:`~torch.nn.BatchNorm2d`.""" # dummy params = self.exportParams() # dummy try: yield # dummy finally: self.importParams(params) # dummy if hasTorch: # dummy @k1lib.patch(nn.Module) # dummy def getParamsVector(model:nn.Module) -> List[torch.Tensor]: # dummy """For each parameter, returns a normal distributed random tensor with the same standard deviation as the original parameter""" # dummy answer = [] # dummy for param in model.parameters(): # dummy a = torch.randn(param.shape).to(param.device) # dummy b = param.std() if param.numel() > 1 else 1 # dummy answer.append(a * b) # dummy return answer # dummy from k1lib.cli import apply, deref, op, item # dummy if hasTorch: # dummy @k1lib.patch(nn.Module) # dummy @contextmanager # dummy def deviceContext(self:nn.Module, buffers:bool=True) -> ContextManager: # dummy """Preserves the device of whatever operation is inside this. Example:: import torch.nn as nn m = nn.Linear(3, 4) with m.deviceContext(): m.cuda() # moves whole model to cuda # automatically moves model to cpu This is capable of preserving buffers' devices too. But it might be unstable. :class:`~torch.nn.parameter.Parameter` are often updated inline, and they keep their old identity, which makes it easy to keep track of which device the parameters are on. However, buffers are rarely updated inline, so their identities change all the time. To deal with this, this does something like this:: devices = [buf.device for buf in self.buffers()] yield # entering context manager for buffer, device in zip(self.buffers(), devices): buffer.data = buffer.data.to(device=device) This means that while inside the context, if you add a buffer anywhere to the network, buffer-device alignment will be shifted and wrong. So, register all your buffers (aka Tensors attached to :class:`~torch.nn.Module`) outside this context to avoid headaches, or set ``buffers`` option to False. If you don't know what I'm talking about, don't worry and just leave as default. :param buffers: whether to preserve device of buffers (regular Tensors attached to :class:`~torch.nn.Module`) or not.""" # dummy pDevs = self.parameters() | apply(lambda t: (t, t.device)) | deref() # dummy if buffers: pbDevs = pbDevs = self.modules() |\ apply(lambda m: (m, m | op().buffers(recurse=False) | op().device.all() | deref())) | deref(maxDepth=1) # dummy try: yield # dummy finally: # dummy for p, dev in pDevs: p.data = p.data.to(device=dev) # dummy if buffers: # dummy for m, bDevs in pbDevs: # dummy for buf, dev in zip(m.buffers(recurse=False), bDevs): # dummy buf.data = buf.data.to(device=dev) # dummy if hasTorch: # dummy @k1lib.patch(nn.Module) # dummy @contextmanager # dummy def gradContext(self): # dummy """Preserves the requires_grad attribute. Example:: m = nn.Linear(2, 3) with m.gradContext(): m.weight.requires_grad = False # returns True m.weight.requires_grad It's worth mentioning that this does not work with buffers (Tensors attached to :class:`torch.nn.Module`), as buffers are not meant to track gradients!""" # dummy grads = [(p, p.requires_grad) for p in self.parameters()] # dummy try: yield # dummy finally: # dummy for p, grad in grads: p.requires_grad_(grad) # dummy if hasTorch: # dummy @k1lib.patch(nn.Module) # dummy def __ror__(self, x): # dummy """Allows piping input to :class:`torch.nn.Module`, to match same style as the module :mod:`k1lib.cli`. Example:: # returns torch.Size([5, 3]) torch.randn(5, 2) | nn.Linear(2, 3) | cli.shape()""" # dummy return self(x) # dummy if hasTorch: # dummy @k1lib.patch(nn.Module, name="nParams") # dummy @property # dummy def nParams(self): # dummy """Get the number of parameters of this module. Example:: # returns 9, because 6 (2*3) for weight, and 3 for bias nn.Linear(2, 3).nParams""" # dummy return sum([p.numel() for p in self.parameters()]) # dummy if hasTorch: # dummy @k1lib.patch(torch) # dummy @k1lib.patch(torch.Tensor) # dummy def crissCross(*others:Tuple[torch.Tensor]) -> torch.Tensor: # dummy """Concats multiple 1d tensors, sorts it, and get evenly-spaced values. Also available as :meth:`torch.crissCross` and :meth:`~k1lib.cli.others.crissCross`. Example:: a = torch.tensor([2, 2, 3, 6]) b = torch.tensor([4, 8, 10, 12, 18, 20, 30, 35]) # returns tensor([2, 3, 6, 10, 18, 30]) a.crissCross(b) # returns tensor([ 2, 4, 8, 10, 18, 20, 30, 35]) a.crissCross(*([b]*10)) # 1 "a" and 10 "b"s # returns tensor([ 2, 2, 3, 6, 18]) b.crissCross(*([a]*10)) # 1 "b" and 10 "a"s Note how in the second case, the length is the same as tensor b, and the contents are pretty close to b. In the third case, it's the opposite. Length is almost the same as tensor a, and the contents are also pretty close to a.""" # dummy return torch.cat([o.flatten() for o in others]).sort()[0][::len(others)] # dummy if hasTorch: # dummy @k1lib.patch(torch) # dummy def sameStorage(a, b): # dummy """Check whether 2 (:class:`numpy.ndarray` or :class:`torch.Tensor`) has the same storage or not. Example:: a = np.linspace(2, 3, 50) # returns True torch.sameStorage(a, a[:5]) # returns True torch.sameStorage(a[:10], a[:5]) returns false torch.sameStorage(a[:10], np.linspace(3, 4)) All examples above should work with PyTorch tensors as well.""" # dummy if isinstance(a, torch.Tensor) and isinstance(b, torch.Tensor): # dummy return a.data_ptr() == b.data_ptr() # dummy if isinstance(a, np.ndarray) and isinstance(b, np.ndarray): # dummy return a.base is b or b.base is a or a.base is b.base # dummy return a is b # dummy if hasTorch: # dummy @k1lib.patch(torch.Tensor) # dummy def histBounds(self:torch.Tensor, bins=100) -> torch.Tensor: # dummy r"""Flattens and sorts the tensor, then get value of tensor at regular linspace intervals. Does not guarantee bounds' uniqueness. Example:: # Tensor with lots of 2s and 5s a = torch.Tensor([2]*5 + [3]*3 + [4] + [5]*4) # returns torch.tensor([2., 3., 5.]) a.histBounds(3).unique() The example result essentially shows 3 bins: :math:`[2, 3)`, :math:`[3, 5)` and :math:`[5, \infty)`. This might be useful in scaling pixels so that networks handle it nicely. Rough idea taken from fastai.medical.imaging.""" # dummy sortedTensor = self.flatten().sort()[0] # dummy ls = torch.linspace(0, 1, bins); ls[-1] = 1-1e-6 # dummy bigLs = (ls * len(sortedTensor)).long() # dummy return sortedTensor[bigLs] # dummy if hasTorch: # dummy @k1lib.patch(torch.Tensor) # dummy def histScaled(self:torch.Tensor, bins=100, bounds=None) -> torch.Tensor: # dummy """Scale tensor's values so that the values are roughly spreaded out in range :math:`[0, 1]` to ease neural networks' pain. Rough idea taken from fastai.medical.imaging. Example:: # normal-distributed values a = torch.randn(1000) # plot #1 shows a normal distribution plt.hist(a.numpy(), bins=30); plt.show() # plot #2 shows almost-uniform distribution plt.hist(a.histScaled().numpy()); plt.show() Plot #1: .. image:: images/histScaledNormal.png Plot #2: .. image:: images/histScaledUniform.png :param bins: if ``bounds`` not specified, then will scale according to a hist with this many bins :param bounds: if specified, then ``bins`` is ignored and will scale according to this. Expected this to be a sorted tensor going from ``min(self)`` to ``max(self)``.""" # dummy if bounds is None: bounds = self.histBounds(bins).unique() # dummy else: bounds = bounds.unique() # dummy out = np.interp(self.numpy().flatten(), bounds, np.linspace(0, 1, len(bounds))) # dummy return torch.tensor(out).reshape(self.shape) # dummy if hasTorch: # dummy @k1lib.patch(torch.Tensor) # dummy def positionalEncode(t:torch.Tensor, richFactor:float=2) -> torch.Tensor: # dummy r"""Position encode a tensor of shape :math:`(L, F)`, where :math:`L` is the sequence length, :math:`F` is the encoded features. Will add the encodings directly to the input tensor and return it. This is a bit different from the standard implementations that ppl use. This is exactly: .. math:: p = \frac{i}{F\cdot richFactor} .. math:: w = 1/10000^p .. math:: pe = sin(w * L) With ``i`` from range [0, F), and ``p`` the "progress". If ``richFactor`` is 1 (original algo), then ``p`` goes from 0% to 100% of the features. Example:: import matplotlib.pyplot as plt, torch, k1lib plt.figure(dpi=150) plt.imshow(torch.zeros(100, 10).positionalEncode().T) .. image:: images/positionalEncoding.png :param richFactor: the bigger, the richer the features are. A lot of times, I observe that the features that are meant to cover huge scales are pretty empty and don't really contribute anything useful. So this is to bump up the usefulness of those features""" # dummy seqN, featsN = t.shape # dummy feats = torch.tensor(range(featsN)); w = (1/10000**(feats/featsN/richFactor))[None, :].expand(t.shape) # dummy times = torch.tensor(range(seqN))[:, None].expand(t.shape) # dummy t[:] = torch.sin(w * times); return t # dummy if hasTorch: # dummy @k1lib.patch(torch.Tensor) # dummy def clearNan(self, value:float=0.0) -> torch.Tensor: # dummy """Sets all nan values to a specified value. Example:: a = torch.randn(3, 3) * float("nan") a.clearNan() # now full of zeros""" # dummy self[self != self] = value # dummy return self # dummy if hasTorch: # dummy @k1lib.patch(torch.Tensor) # dummy def hasNan(self) -> bool: # dummy """Returns whether this Tensor has any nan values at all.""" # dummy return (self != self).sum() > 0 # dummy if hasTorch: # dummy @k1lib.patch(torch.Tensor) # dummy def stats(self) -> Tuple[float, float]: # dummy return self.mean(), self.std() # dummy inf = float("inf") # dummy if hasTorch: # dummy @k1lib.patch(torch.Tensor) # dummy def hasInfs(self): # dummy """Whether this Tensor has negative or positive infinities.""" # dummy return (self == inf).any() or (self == -inf).any() # dummy if hasTorch: # dummy @k1lib.patch(torch) # dummy def loglinspace(a, b, n=100, **kwargs): # dummy """Like :meth:`torch.linspace`, but spread the values out in log space, instead of linear space. Different from :meth:`torch.logspace`""" # dummy return math.e**torch.linspace(math.log(a), math.log(b), n, **kwargs) # dummy @k1lib.patch(torch) # dummy def transpose_axes(it:"torch.Tensor", axes:"tuple[int]"): # dummy """Transpose but with numpy-like signature. Example:: torch.transpose_axes(torch.randn(3,4,5,6), (2,3,1,0)) | shape() # returns (5,6,4,3) """ # dummy n = len(axes); d = {x:x for x in range(n)} # dummy if len(axes) != len(it.shape): raise ValueError("axes don't match tensor") # dummy for a in range(n): # dummy b = d[axes[a]]; x = d[a]; y = d[b]; t = d[x]; d[x] = d[y]; d[y] = t # dummy if a != b: it = torch.transpose(it, a, b) # dummy return it # dummy @k1lib.patch(np) # dummy def loglinspace(a, b, n=100, **kwargs): # loglinspace """Like :meth:`torch.linspace`, but spread the values out in log space, instead of linear space. Different from :meth:`torch.logspace`""" # loglinspace return math.e**np.linspace(math.log(a), math.log(b), n, **kwargs) # loglinspace try: # loglinspace import graphviz # loglinspace @k1lib.patch(graphviz.Digraph, "__call__") # loglinspace @k1lib.patch(graphviz.Graph, "__call__") # loglinspace def _call(self, _from, *tos, **kwargs): # loglinspace """Convenience method to quickly construct graphs. Example:: g = k1lib.graph() g("a", "b", "c") g # displays arrows from "a" to "b" and "a" to "c" """ # loglinspace for to in tos: self.edge(_from, to, **kwargs) # loglinspace except: pass # loglinspace try: # loglinspace import matplotlib.pyplot as plt # loglinspace from mpl_toolkits.mplot3d import Axes3D, art3d # loglinspace @k1lib.patch(Axes3D) # loglinspace def march(self, heatMap, level:float=0, facecolor=[0.45, 0.45, 0.75], edgecolor=None): # loglinspace """Use marching cubes to plot surface of a 3d heat map. Example:: plt.k3d(6).march(k1lib.perlin3d(), 0.17) .. image:: images/march.png A more tangible example:: t = torch.zeros(100, 100, 100) t[20:30,20:30,20:30] = 1 t[80:90,20:30,40:50] = 1 plt.k3d().march(t.numpy()) The function name is "march" because how it works internally is by using something called marching cubes. :param heatMap: 3d numpy array :param level: array value to form the surface on""" # loglinspace from skimage import measure # loglinspace try: verts, faces, normals, values = measure.marching_cubes(heatMap, level) # loglinspace except: verts, faces, normals, values = measure.marching_cubes_lewiner(heatMap, level) # loglinspace mesh = art3d.Poly3DCollection(verts[faces]) # loglinspace if facecolor is not None: mesh.set_facecolor(facecolor) # loglinspace if edgecolor is not None: mesh.set_edgecolor(edgecolor) # loglinspace self.add_collection3d(mesh) # loglinspace self.set_xlim(0, heatMap.shape[0]) # loglinspace self.set_ylim(0, heatMap.shape[1]) # loglinspace self.set_zlim(0, heatMap.shape[2]); return self # loglinspace @k1lib.patch(Axes3D) # loglinspace def aspect(self): # loglinspace """Use the same aspect ratio for all axes.""" # loglinspace self.set_box_aspect([ub - lb for lb, ub in (getattr(self, f'get_{a}lim')() for a in 'xyz')]) # loglinspace @k1lib.patch(plt) # loglinspace def k3d(size=8, labels=True, *args, **kwargs): # loglinspace """Convenience function to get an :class:`~mpl_toolkits.mplot3d.axes3d.Axes3D`. :param labels: whether to include xyz labels or not :param size: figure size""" # loglinspace if isinstance(size, (int, float)): size = (size, size) # loglinspace fig = plt.figure(figsize=size, constrained_layout=True, *args, **kwargs) # loglinspace ax = fig.add_subplot(projection="3d") # loglinspace if labels: # loglinspace ax.set_xlabel('x') # loglinspace ax.set_ylabel('y') # loglinspace ax.set_zlabel('z') # loglinspace return ax # loglinspace @k1lib.patch(plt) # loglinspace def animate(azimSpeed=3, azimStart=0, elevSpeed=0.9, elevStart=0, frames=20, close=True): # loglinspace """Animates the existing 3d axes. Example:: plt.k3d().scatter(*np.random.randn(3, 10)) plt.animate() :param frames: how many frames to render? Frame rate is 30 fps :param close: whether to close the figure (to prevent the animation and static plot showing at the same time) or not""" # loglinspace fig = plt.gcf() # loglinspace def f(frame): # loglinspace for ax in fig.axes: # loglinspace ax.view_init(elevStart+frame*elevSpeed, azimStart+frame*azimSpeed) # loglinspace if close: plt.close() # loglinspace return k1lib.viz.FAnim(fig, f, frames) # loglinspace @k1lib.patch(plt) # loglinspace def getFig(): # loglinspace """Grab figure of the current plot. Example:: plt.plot() | plt.getFig() | toImg() Internally, this just calls ``plt.gcf()`` and that's it, pretty simple. But I usually plot things as a part of the cli pipeline, and it's very annoying that I can't quite chain ``plt.gcf()`` operation, so I created this This has an alias called ``plt.toFig()`` """ # loglinspace return cli.aS(lambda _: plt.gcf()) # loglinspace @k1lib.patch(plt) # loglinspace def toFig(): return cli.aS(lambda _: plt.gcf()) # loglinspace k1lib._settings.monkey.add("capturePlt", False, "whether to intercept matplotlib's show() and turn it into an image or not") # loglinspace _oldShow = plt.show; _recentImg = [None] # loglinspace @k1lib.patch(plt) # loglinspace def show(*args, **kwargs): # loglinspace try: # loglinspace if k1lib._settings.monkey.capturePlt: _recentImg[0] = plt.gcf() | k1lib.cli.toImg() # loglinspace except: return _oldShow(*args, **kwargs) # loglinspace @k1lib.patch(plt) # loglinspace def _k1_capturedImg(): return _recentImg[0] # loglinspace except: pass # loglinspace try: # loglinspace @k1lib.patch(Axes3D) # loglinspace def plane(self, origin, v1, v2=None, s1:float=1, s2:float=1, **kwargs): # loglinspace """Plots a 3d plane. :param origin: origin vector, shape (3,) :param v1: 1st vector, shape (3,) :param v2: optional 2nd vector, shape(3,). If specified, plots a plane created by 2 vectors. If not, plots a plane perpendicular to the 1st vector :param s1: optional, how much to scale 1st vector by :param s2: optional, how much to scale 2nd vector by :param kwargs: keyword arguments passed to :meth:`~mpl_toolkits.mplot3d.axes3d.Axes3D.plot_surface`""" # loglinspace v1 = (v1 if isinstance(v1, torch.Tensor) else torch.tensor(v1)).float() # loglinspace if v2 is None: # loglinspace v = v1 # loglinspace v1 = torch.tensor([1.0, 1, -(v[0]+v[1])/v[2]]) # loglinspace v2 = torch.cross(v, v1) # loglinspace v2 = (v2 if isinstance(v2, torch.Tensor) else torch.tensor(v2)).float() # loglinspace origin = (origin if isinstance(origin, torch.Tensor) else torch.tensor(origin)).float() # loglinspace x = torch.linspace(-1, 1, 50)[:,None] # loglinspace v1 = (v1[None,:]*x*s1)[:,None] # loglinspace v2 = (v2[None,:]*x*s2)[None,:] # loglinspace origin = origin[None,None,:] # loglinspace plane = (origin + v1 + v2).permute(2, 0, 1) # loglinspace self.plot_surface(*plane.numpy(), **kwargs) # loglinspace except: pass # loglinspace try: # loglinspace @k1lib.patch(Axes3D) # loglinspace def point(self, v, **kwargs): # loglinspace """Plots a 3d point. :param v: point location, shape (3,) :param kwargs: keyword argument passed to :meth:`~mpl_toolkits.mplot3d.axes3d.Axes3D.scatter`""" # loglinspace v = (v if hasTorch and isinstance(v, torch.Tensor) else torch.tensor(v)).float() # loglinspace self.scatter(*v, **kwargs) # loglinspace @k1lib.patch(Axes3D) # loglinspace def line(self, v1, v2, **kwargs): # loglinspace """Plots a 3d line. :param v1: 1st point location, shape (3,) :param v2: 2nd point location, shape (3,) :param kwargs: keyword argument passed to :meth:`~mpl_toolkits.mplot3d.axes3d.Axes3D.plot`""" # loglinspace self.plot(*torch.tensor([list(v1), list(v2)]).float().T, **kwargs) # loglinspace except: pass # loglinspace try: # loglinspace @k1lib.patch(Axes3D) # loglinspace def surface(self, z, **kwargs): # loglinspace """Plots 2d surface in 3d. Pretty much exactly the same as :meth:`~mpl_toolkits.mplot3d.axes3d.Axes3D.plot_surface`, but fields x and y are filled in automatically. Example:: x, y = np.meshgrid(np.linspace(-2, 2), np.linspace(-2, 2)) plt.k3d(6).surface(x**3 + y**3) .. image:: images/surface.png :param z: 2d numpy array for the heights :param kwargs: keyword arguments passed to ``plot_surface``""" # loglinspace if hasTorch and isinstance(z, torch.Tensor): z = z.numpy() # loglinspace x, y = z.shape # loglinspace x, y = np.meshgrid(np.array(range(y)), np.array(range(x))) # loglinspace return self.plot_surface(x, y, z, **kwargs) # loglinspace except: pass # loglinspace try: # loglinspace import matplotlib as mpl, matplotlib.cm as cm, json # loglinspace @k1lib.patch(mpl.colors.Colormap) # loglinspace def __hash__(self): return hash(self.name) # loglinspace def _generate_jsF_cm(cm1): # loglinspace xs = np.linspace(0, 1, 100); data = cli.deref(igT=False)(cm1(xs)); data = [*data, data[-1], data[-1]] | cli.aS(json.dumps) # loglinspace def _jsF_cm(meta): # loglinspace fIdx = cli.init._jsFAuto(); dataIdx = cli.init._jsDAuto(); ctxIdx = f"{cli.init._jsDAuto()}_{round(time.time()*1000)}" # loglinspace return f"""//k1_moveOutStart\n{ctxIdx} = {data};\n//k1_moveOutEnd {fIdx} = ({dataIdx}) => {{ const isNumber = typeof({dataIdx}) === "number"; if (isNumber) {dataIdx} = [{dataIdx}]; const ans = []; for (let x of {dataIdx}) {{ x = Math.max(Math.min(x, 1), 0); const i = Math.floor(x*100); const b = x*100-i; const a = 1-b; const c1 = {ctxIdx}[i]; const c2 = {ctxIdx}[i+1]; ans.push([c1[0]*a+c2[0]*b, c1[1]*a+c2[1]*b, c1[2]*a+c2[2]*b, c1[3]*a+c2[3]*b]); }} return isNumber ? ans[0] : ans; }}""", fIdx # loglinspace return _jsF_cm # loglinspace try: # loglinspace for x in [x for x in cm.__dict__.values() if isinstance(x, mpl.colors.Colormap)]: # loglinspace try: k1lib.settings.cli.kjs.jsF[x] = _generate_jsF_cm(x) # loglinspace except: pass # loglinspace except: pass # loglinspace except: pass # loglinspace geopandas = k1lib.dep("geopandas", url="https://geopandas.org/") # loglinspace try: # loglinspace import matplotlib.pyplot as plt # loglinspace @k1lib.patch(plt) # loglinspace def worldmap(**kwargs): # loglinspace """Plots a world map on the current axes. Example:: plt.figure(figsize=(16, 10)) plt.worldmap() plt.plot((-118.2611, -83.2162, -97.822), (34.074, 42.3575, 37.751), "o") """ # loglinspace with k1lib.ignoreWarnings(): # loglinspace geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres")).plot(**{"color": "lightgrey", "ax": plt.gca(), **kwargs}) # loglinspace except: pass # loglinspace try: # loglinspace import pandas as pd # loglinspace @k1lib.patch(pd.DataFrame) # loglinspace def table(self): # loglinspace """Converts a :class:`pandas.core.frame.DataFrame` to a normal table made from lists (with column headers), so that it can be more easily manipulated with cli tools. Example:: pd.read_csv("abc.csv").table()""" # loglinspace yield self.columns.to_list() # loglinspace yield from self.values # loglinspace @k1lib.patch(pd.DataFrame) # loglinspace def newColName(self, N=1, prefix="col_"): # loglinspace """Creates a new column name that hasn't existed before Example:: df = pd.DataFrame(...) df.newColName(N=1) # likely returns "col_1" df.newColName(N=3) # likely returns ["col_1", "col_2", "col_3"] df.newColName(N=None) # returns an iterator that generates infinitely many names df.newColName(N=float("inf")) # same as above :param N: whether to generate a single name or multiple names :param prefix: prefix of the generated names""" # loglinspace autoIdx = k1lib.AutoIncrement(prefix=prefix); s = set(list(self)) # loglinspace if N == 1: # loglinspace while True: # loglinspace e = autoIdx() # loglinspace if e not in s: return e # loglinspace elif N is not None and N < float("inf"): # loglinspace ans = []; count = 0 # loglinspace while True: # loglinspace e = autoIdx() # loglinspace if e not in s: ans.append(e); count += 1 # loglinspace if count >= N: return ans # loglinspace else: # loglinspace def gen(): # loglinspace while True: # loglinspace e = autoIdx() # loglinspace if e not in s: yield e # loglinspace return gen() # loglinspace @k1lib.patch(pd.DataFrame) # loglinspace def replaceCol(self, name, col): # loglinspace """Replaces a specific column with a different specified column. Example:: df = pd.DataFrame({"A": [0, 1, 2], "B": [3, 4, 5]}) df.replaceCol("A", ["a", "b", "c"]) """ # loglinspace cols = [self[c] for c in list(self)]; cIdx = [i for i,c in enumerate(list(self)) if c == name][0] # loglinspace ogName = cols[cIdx].name; cols[cIdx] = col; nameGen = self.newColName(None) # loglinspace return pd.DataFrame({getattr(c, "name", ogName if i == cIdx else next(nameGen)):c for i,c in enumerate(cols)}) # loglinspace except: pass # loglinspace try: # loglinspace import forbiddenfruit # loglinspace def splitCamel(s): # loglinspace """Splits a string up based on camel case. Example:: # returns ['IHave', 'No', 'Idea', 'What', 'To', 'Put', 'Here'] "IHaveNoIdeaWhatToPutHere".splitCamel()""" # loglinspace words = [[s[0]]] # loglinspace for c in s[1:]: # loglinspace if words[-1][-1].islower() and c.isupper(): # loglinspace words.append(list(c)) # loglinspace else: words[-1].append(c) # loglinspace return [''.join(word) for word in words] # loglinspace forbiddenfruit.curse(str, "splitCamel", splitCamel) # loglinspace except: pass # loglinspace try: # loglinspace import ray # loglinspace @ray.remote # loglinspace class RayProgress: # loglinspace def __init__(self, n): self.values = [0]*n; self.thStop = False # loglinspace def update(self, idx:int, val:float): self.values[idx] = val; return self.values[idx] # loglinspace def stop(self): self.thStop = True # loglinspace def content(self): return self.thStop, self.values | cli.apply(lambda x: f"{round(x*100)}%") | cli.join(" | ") # loglinspace def startRayProgressThread(rp, title:str="Progress"): # loglinspace def inner(x): # loglinspace if x == 0: return # loglinspace print("Starting...\r", end="") # loglinspace beginTime = time.time() # loglinspace while True: # loglinspace stop, content = ray.get(rp.content.remote()) # loglinspace print(f"{title}: {content}, {round(time.time()-beginTime)}s elapsed \r", end="") # loglinspace if stop: break # loglinspace time.sleep(0.01) # loglinspace [0, 1] | cli.applyTh(inner, timeout=1e9, prefetch=10) | cli.item() # loglinspace @k1lib.patch(ray) # loglinspace @contextmanager # loglinspace def progress(n:int, title:str="Progress"): # loglinspace """Manages multiple progress bars distributedly. Example:: with ray.progress(5) as rp: def process(idx:int): for i in range(100): time.sleep(0.05) # do some processing rp.update.remote(idx, (i+1)/100) # update progress. Expect number between 0 and 1 range(5) | applyCl(process) | deref() # execute function in multiple nodes This will print out a progress bar that looks like this:: Progress: 100% | 100% | 100% | 100% | 100% :param n: number of progresses to keep track of :param title: title of the progress to show""" # loglinspace rp = RayProgress.remote(n); startRayProgressThread(rp, title); yield rp # loglinspace ray.get(rp.stop.remote()); time.sleep(0.1) # loglinspace except: pass # loglinspace @k1lib.patch(np) # loglinspace def gather(self, dim, index): # gather """Gathers values along an axis specified by ``dim``. For a 3-D tensor the output is specified by:: out[i][j][k] = input[index[i][j][k]][j][k] # if dim == 0 out[i][j][k] = input[i][index[i][j][k]][k] # if dim == 1 out[i][j][k] = input[i][j][index[i][j][k]] # if dim == 2 Not my code. All credits go to https://stackoverflow.com/questions/46868056/how-to-gather-elements-of-specific-indices-in-numpy :param dim: the axis along which to index :param index: A tensor of indices of elements to gather""" # gather idx_xsection_shape = index.shape[:dim] + index.shape[dim + 1:] # gather self_xsection_shape = self.shape[:dim] + self.shape[dim + 1:] # gather if idx_xsection_shape != self_xsection_shape: raise ValueError("Except for dimension " + str(dim) + ", all dimensions of index and self should be the same size") # gather if index.dtype != np.dtype('int_'): raise TypeError("The values of index must be integers") # gather data_swaped = np.swapaxes(self, 0, dim); index_swaped = np.swapaxes(index, 0, dim) # gather gathered = np.choose(index_swaped, data_swaped); return np.swapaxes(gathered, 0, dim) # gather try: # gather import forbiddenfruit # gather def expand(self, sh): return np.broadcast_to(self, sh) # gather forbiddenfruit.curse(np.ndarray, "expand", expand) # gather except: pass # gather @k1lib.patch(os) # gather def netstat(): # netstat """Runs netstat command and splits it up into a nice table. Example:: os.netstat() # returns [["Proto", "Recv-Q", "Send-Q", "Local Address", ...], [...], ...] """ # netstat cli = k1lib.cli; return None | cli.cmd("netstat -lunpt", mode=0) | cli.item() | ~cli.head(1) | cli.unpretty(headers=["Proto", "Recv", "Send-Q", "Local Address", "Foreign", "State", "PID"]) | cli.op().strip().all(2) | cli.deref() # netstat _portScan = {"lastScan": 0, "data": None} # netstat @k1lib.patch(os) # netstat def killPort(port:int, force:bool=False, allowMulti:bool=False): # killPort """Kills the process that is listening on a particular port. Example:: os.killPort(8888) # kill jupyterlab process, if it's running on this port :param force: if True, will send SIGKILL, else send SIGTERM :param allowMulti: if True, allows multiple processes listening on the same port, else throws an error when that happens""" # killPort cli = k1lib.cli # killPort if time.time() - _portScan["lastScan"] > 5: # killPort _portScan["data"] = os.netstat() | ~cli.head(1) | cli.cut(3, 6) | cli.apply(cli.op().split(":")[-1], 0) | cli.apply(cli.op().split("/")[0], 1) | cli.toInt(0, 1) | cli.apply(tuple) | cli.unique() | cli.groupBy(0, True) | cli.apply(cli.joinSt(), 1) | cli.deref() | cli.toDict() # killPort _portScan["lastScan"] = time.time() # killPort if port not in _portScan["data"]: raise Exception(f"No process detected to be listening on port {port}") # killPort pids = _portScan["data"][port] # killPort if len(pids) > 1 and not allowMulti: raise Exception(f"Port {port} has {len(pids)} processes. Situation might be more complex than you think, so aborting killing. Set .allowMulti to True if you really want to do this") # killPort sig = signal.SIGKILL if force else signal.SIGTERM # killPort for pid in pids: os.kill(pid, sig) # killPort try: # these just try to prevent giant images in .data fields from hanging up JupyterLab's contextual help # killPort import rosbag; bagMsg_oldRepr = rosbag.bag.BagMessage.__repr__ # killPort @k1lib.patch(rosbag.bag.BagMessage) # killPort def __repr__(self): # killPort s = bagMsg_oldRepr(self).split("\n"); ss = [] # killPort if hasattr(self.message, "data") and isinstance(self.message.data, bytes): # killPort for line in s: ss.append(f"data: ({len(self.message.data)} length), {self.message.data[:100]}..." if line.startswith("data: [") else line) # killPort return "\n".join(ss) # killPort return "\n".join(s) # killPort @k1lib.patch(rosbag.bag.BagMessage) # killPort def __str__(self): return self.__repr__() # killPort except: pass # killPort try: # killPort import genpy; genpyMsg_oldRepr = genpy.message.Message.__repr__ # killPort @k1lib.patch(genpy.message.Message) # killPort def __repr__(self): # killPort s = genpyMsg_oldRepr(self).split("\n"); ss = [] # killPort if hasattr(self, "data") and isinstance(self.data, bytes): # killPort for line in s: ss.append(f"data: ({len(self.data)} length), {self.data[:100]}..." if line.startswith("data: [") else line) # killPort return "\n".join(ss) # killPort return "\n".join(s) # killPort @k1lib.patch(genpy.message.Message) # killPort def __str__(self): return self.__repr__() # killPort except: pass # killPort def patchPandas(): # patchPandas """Patches panda's :class:`pandas.core.series.Series` and :class:`pandas.core.frame.DataFrame` so that piping works:: pd.read_csv("a.csv")["col3"] | shape()""" # patchPandas try: # patchPandas import pandas as pd # patchPandas except: return # patchPandas try: # patchPandas if pd._k1_patched: return # previously when used with forbiddenfruit, apparently its bad for this function to run again, so have a sentinel/guard here to check. Not sure if this is still needed # patchPandas except: pass # patchPandas def patch(klass): # patchPandas oldPdOr = klass.__or__ # patchPandas def newPdOr(self, v): # patchPandas if isinstance(v, cli.BaseCli): return NotImplemented # patchPandas try: return oldPdOr(self, v) # patchPandas except: warnings.warn(traceback.format_exc()) # patchPandas klass.__or__ = newPdOr # forbiddenfruit.curse(klass, "__or__", newPdOr) # turns out forbiddenfruit is not needed! # patchPandas return oldPdOr, newPdOr # patchPandas try: # patchPandas patch(pd.core.arraylike.OpsMixin) # patchPandas pd._k1_patched = True # patchPandas except Exception as e: warnings.warn(f"Tried to patch __or__ operator of type `pd.core.arraylike.OpsMixin` but can't because: {e}") # patchPandas patchPandas() # patchPandas try: # patchPandas import html, base64 # patchPandas @k1lib.patch(html) # patchPandas def b64escape(s): # patchPandas """Escape the html by converting the whole thing to base64, then return a string with "atob" function that decodes that mess""" # patchPandas return f"atob('{base64.b64encode(s.encode()).decode()}')" # patchPandas except: pass # patchPandas