# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import k1lib, math, numpy as np, time, signal, os, warnings, traceback, json
from k1lib import cli
from typing import List, Tuple, ContextManager
from contextlib import contextmanager
try: import torch; from torch import nn; hasTorch = True
except: hasTorch = False
try: import matplotlib.animation
except: pass
try: import matplotlib.pyplot as plt
except: pass
__all__ = ["dummy"]
[docs]def dummy(): # dummy
"""Does nothing. Only here so that you can read source code of this file and
see what’s up.""" # dummy
pass # dummy
settings = k1lib.settings.add("monkey", k1lib.Settings(), "monkey-patched settings").monkey # dummy
k1lib._settings.add("monkey", k1lib.Settings(), "monkey-patched settings") # dummy
try: # dummy
import forbiddenfruit # dummy
def hasNan(self): # dummy
"""Checks whether numpy array has nan or not""" # dummy
return np.isnan(self).any() # dummy
forbiddenfruit.curse(np.ndarray, "hasNan", hasNan) # dummy
def clearNan(self, value:float=0.0) -> np.ndarray: # dummy
"""Sets all nan values to a specified value.
Example::
a = np.random.randn(3, 3) * float("nan")
a.clearNan() # now full of zeros""" # dummy
self[self != self] = value; return self # dummy
forbiddenfruit.curse(np.ndarray, "clearNan", clearNan) # dummy
except: pass # dummy
if hasTorch: # dummy
@k1lib.patch(nn.Module) # dummy
def importParams(self:nn.Module, params:List[nn.Parameter]): # dummy
"""Given a list of :class:`torch.nn.parameter.Parameter`/:class:`torch.Tensor`,
update the current :class:`torch.nn.Module`'s parameters with it'""" # dummy
for oldParam, newParam in zip(self.parameters(), params): # dummy
oldParam.data = newParam.data.clone() # dummy
@k1lib.patch(nn.Module) # dummy
def exportParams(self:nn.Module) -> List[torch.Tensor]: # dummy
"""Gets the list of :class:`torch.Tensor` data""" # dummy
return [param.data.clone() for param in self.parameters()] # dummy
class ParamsContext: # dummy
def __init__(self, m:nn.Module): self.m = m # dummy
def __enter__(self): self.params = self.m.exportParams(); return self.params # dummy
def __exit__(self, *ignored): self.m.importParams(self.params) # dummy
if hasTorch: # dummy
@k1lib.patch(nn.Module) # dummy
@contextmanager # dummy
def paramsContext(self:nn.Module): # dummy
"""A nice context manager for :meth:`importParams` and :meth:`exportParams`.
Returns the old parameters on enter context. Example::
m = nn.Linear(2, 3)
with m.paramsContext() as oldParam:
pass # go wild, train, mutate `m` however much you like
# m automatically snaps back to the old param
Small reminder that this is not foolproof, as there are some :class:`~torch.nn.Module`
that stores extra information not accessible from the model itself, like
:class:`~torch.nn.BatchNorm2d`.""" # dummy
params = self.exportParams() # dummy
try: yield # dummy
finally: self.importParams(params) # dummy
if hasTorch: # dummy
@k1lib.patch(nn.Module) # dummy
def getParamsVector(model:nn.Module) -> List[torch.Tensor]: # dummy
"""For each parameter, returns a normal distributed random tensor
with the same standard deviation as the original parameter""" # dummy
answer = [] # dummy
for param in model.parameters(): # dummy
a = torch.randn(param.shape).to(param.device) # dummy
b = param.std() if param.numel() > 1 else 1 # dummy
answer.append(a * b) # dummy
return answer # dummy
from k1lib.cli import apply, deref, op, item # dummy
if hasTorch: # dummy
@k1lib.patch(nn.Module) # dummy
@contextmanager # dummy
def deviceContext(self:nn.Module, buffers:bool=True) -> ContextManager: # dummy
"""Preserves the device of whatever operation is inside this.
Example::
import torch.nn as nn
m = nn.Linear(3, 4)
with m.deviceContext():
m.cuda() # moves whole model to cuda
# automatically moves model to cpu
This is capable of preserving buffers' devices too. But it might be unstable.
:class:`~torch.nn.parameter.Parameter` are often updated inline, and they keep
their old identity, which makes it easy to keep track of which device the parameters
are on. However, buffers are rarely updated inline, so their identities change all
the time. To deal with this, this does something like this::
devices = [buf.device for buf in self.buffers()]
yield # entering context manager
for buffer, device in zip(self.buffers(), devices):
buffer.data = buffer.data.to(device=device)
This means that while inside the context, if you add a buffer anywhere to the
network, buffer-device alignment will be shifted and wrong. So, register all
your buffers (aka Tensors attached to :class:`~torch.nn.Module`) outside this context
to avoid headaches, or set ``buffers`` option to False.
If you don't know what I'm talking about, don't worry and just leave as default.
:param buffers: whether to preserve device of buffers (regular Tensors attached
to :class:`~torch.nn.Module`) or not.""" # dummy
pDevs = self.parameters() | apply(lambda t: (t, t.device)) | deref() # dummy
if buffers: pbDevs = pbDevs = self.modules() |\
apply(lambda m: (m, m | op().buffers(recurse=False) | op().device.all() | deref())) | deref(maxDepth=1) # dummy
try: yield # dummy
finally: # dummy
for p, dev in pDevs: p.data = p.data.to(device=dev) # dummy
if buffers: # dummy
for m, bDevs in pbDevs: # dummy
for buf, dev in zip(m.buffers(recurse=False), bDevs): # dummy
buf.data = buf.data.to(device=dev) # dummy
if hasTorch: # dummy
@k1lib.patch(nn.Module) # dummy
@contextmanager # dummy
def gradContext(self): # dummy
"""Preserves the requires_grad attribute.
Example::
m = nn.Linear(2, 3)
with m.gradContext():
m.weight.requires_grad = False
# returns True
m.weight.requires_grad
It's worth mentioning that this does not work with buffers (Tensors attached to
:class:`torch.nn.Module`), as buffers are not meant to track gradients!""" # dummy
grads = [(p, p.requires_grad) for p in self.parameters()] # dummy
try: yield # dummy
finally: # dummy
for p, grad in grads: p.requires_grad_(grad) # dummy
if hasTorch: # dummy
@k1lib.patch(nn.Module) # dummy
def __ror__(self, x): # dummy
"""Allows piping input to :class:`torch.nn.Module`, to match same style as
the module :mod:`k1lib.cli`. Example::
# returns torch.Size([5, 3])
torch.randn(5, 2) | nn.Linear(2, 3) | cli.shape()""" # dummy
return self(x) # dummy
if hasTorch: # dummy
@k1lib.patch(nn.Module, name="nParams") # dummy
@property # dummy
def nParams(self): # dummy
"""Get the number of parameters of this module.
Example::
# returns 9, because 6 (2*3) for weight, and 3 for bias
nn.Linear(2, 3).nParams""" # dummy
return sum([p.numel() for p in self.parameters()]) # dummy
if hasTorch: # dummy
@k1lib.patch(torch) # dummy
@k1lib.patch(torch.Tensor) # dummy
def crissCross(*others:Tuple[torch.Tensor]) -> torch.Tensor: # dummy
"""Concats multiple 1d tensors, sorts it, and get evenly-spaced values. Also
available as :meth:`torch.crissCross` and :meth:`~k1lib.cli.others.crissCross`.
Example::
a = torch.tensor([2, 2, 3, 6])
b = torch.tensor([4, 8, 10, 12, 18, 20, 30, 35])
# returns tensor([2, 3, 6, 10, 18, 30])
a.crissCross(b)
# returns tensor([ 2, 4, 8, 10, 18, 20, 30, 35])
a.crissCross(*([b]*10)) # 1 "a" and 10 "b"s
# returns tensor([ 2, 2, 3, 6, 18])
b.crissCross(*([a]*10)) # 1 "b" and 10 "a"s
Note how in the second case, the length is the same as tensor b, and the contents
are pretty close to b. In the third case, it's the opposite. Length is almost
the same as tensor a, and the contents are also pretty close to a.""" # dummy
return torch.cat([o.flatten() for o in others]).sort()[0][::len(others)] # dummy
if hasTorch: # dummy
@k1lib.patch(torch) # dummy
def sameStorage(a, b): # dummy
"""Check whether 2 (:class:`numpy.ndarray` or :class:`torch.Tensor`)
has the same storage or not. Example::
a = np.linspace(2, 3, 50)
# returns True
torch.sameStorage(a, a[:5])
# returns True
torch.sameStorage(a[:10], a[:5])
returns false
torch.sameStorage(a[:10], np.linspace(3, 4))
All examples above should work with PyTorch tensors as well.""" # dummy
if isinstance(a, torch.Tensor) and isinstance(b, torch.Tensor): # dummy
return a.data_ptr() == b.data_ptr() # dummy
if isinstance(a, np.ndarray) and isinstance(b, np.ndarray): # dummy
return a.base is b or b.base is a or a.base is b.base # dummy
return a is b # dummy
if hasTorch: # dummy
@k1lib.patch(torch.Tensor) # dummy
def histBounds(self:torch.Tensor, bins=100) -> torch.Tensor: # dummy
r"""Flattens and sorts the tensor, then get value of tensor at regular
linspace intervals. Does not guarantee bounds' uniqueness. Example::
# Tensor with lots of 2s and 5s
a = torch.Tensor([2]*5 + [3]*3 + [4] + [5]*4)
# returns torch.tensor([2., 3., 5.])
a.histBounds(3).unique()
The example result essentially shows 3 bins: :math:`[2, 3)`, :math:`[3, 5)` and
:math:`[5, \infty)`. This might be useful in scaling pixels so that networks handle
it nicely. Rough idea taken from fastai.medical.imaging.""" # dummy
sortedTensor = self.flatten().sort()[0] # dummy
ls = torch.linspace(0, 1, bins); ls[-1] = 1-1e-6 # dummy
bigLs = (ls * len(sortedTensor)).long() # dummy
return sortedTensor[bigLs] # dummy
if hasTorch: # dummy
@k1lib.patch(torch.Tensor) # dummy
def histScaled(self:torch.Tensor, bins=100, bounds=None) -> torch.Tensor: # dummy
"""Scale tensor's values so that the values are roughly spreaded out in range
:math:`[0, 1]` to ease neural networks' pain. Rough idea taken from
fastai.medical.imaging. Example::
# normal-distributed values
a = torch.randn(1000)
# plot #1 shows a normal distribution
plt.hist(a.numpy(), bins=30); plt.show()
# plot #2 shows almost-uniform distribution
plt.hist(a.histScaled().numpy()); plt.show()
Plot #1:
.. image:: images/histScaledNormal.png
Plot #2:
.. image:: images/histScaledUniform.png
:param bins: if ``bounds`` not specified, then will scale according to a hist
with this many bins
:param bounds: if specified, then ``bins`` is ignored and will scale according to
this. Expected this to be a sorted tensor going from ``min(self)`` to
``max(self)``.""" # dummy
if bounds is None: bounds = self.histBounds(bins).unique() # dummy
else: bounds = bounds.unique() # dummy
out = np.interp(self.numpy().flatten(), bounds, np.linspace(0, 1, len(bounds))) # dummy
return torch.tensor(out).reshape(self.shape) # dummy
if hasTorch: # dummy
@k1lib.patch(torch.Tensor) # dummy
def positionalEncode(t:torch.Tensor, richFactor:float=2) -> torch.Tensor: # dummy
r"""Position encode a tensor of shape :math:`(L, F)`, where :math:`L`
is the sequence length, :math:`F` is the encoded features. Will add the
encodings directly to the input tensor and return it.
This is a bit different from the standard implementations that ppl use.
This is exactly:
.. math:: p = \frac{i}{F\cdot richFactor}
.. math:: w = 1/10000^p
.. math:: pe = sin(w * L)
With ``i`` from range [0, F), and ``p`` the "progress". If ``richFactor`` is 1
(original algo), then ``p`` goes from 0% to 100% of the features. Example::
import matplotlib.pyplot as plt, torch, k1lib
plt.figure(dpi=150)
plt.imshow(torch.zeros(100, 10).positionalEncode().T)
.. image:: images/positionalEncoding.png
:param richFactor: the bigger, the richer the features are. A lot of
times, I observe that the features that are meant to cover huge scales
are pretty empty and don't really contribute anything useful. So this
is to bump up the usefulness of those features""" # dummy
seqN, featsN = t.shape # dummy
feats = torch.tensor(range(featsN)); w = (1/10000**(feats/featsN/richFactor))[None, :].expand(t.shape) # dummy
times = torch.tensor(range(seqN))[:, None].expand(t.shape) # dummy
t[:] = torch.sin(w * times); return t # dummy
if hasTorch: # dummy
@k1lib.patch(torch.Tensor) # dummy
def clearNan(self, value:float=0.0) -> torch.Tensor: # dummy
"""Sets all nan values to a specified value.
Example::
a = torch.randn(3, 3) * float("nan")
a.clearNan() # now full of zeros""" # dummy
self[self != self] = value # dummy
return self # dummy
if hasTorch: # dummy
@k1lib.patch(torch.Tensor) # dummy
def hasNan(self) -> bool: # dummy
"""Returns whether this Tensor has any nan values at all.""" # dummy
return (self != self).sum() > 0 # dummy
if hasTorch: # dummy
@k1lib.patch(torch.Tensor) # dummy
def stats(self) -> Tuple[float, float]: # dummy
return self.mean(), self.std() # dummy
inf = float("inf") # dummy
if hasTorch: # dummy
@k1lib.patch(torch.Tensor) # dummy
def hasInfs(self): # dummy
"""Whether this Tensor has negative or positive infinities.""" # dummy
return (self == inf).any() or (self == -inf).any() # dummy
if hasTorch: # dummy
@k1lib.patch(torch) # dummy
def loglinspace(a, b, n=100, **kwargs): # dummy
"""Like :meth:`torch.linspace`, but spread the values out in log space,
instead of linear space. Different from :meth:`torch.logspace`""" # dummy
return math.e**torch.linspace(math.log(a), math.log(b), n, **kwargs) # dummy
@k1lib.patch(torch) # dummy
def transpose_axes(it:"torch.Tensor", axes:"tuple[int]"): # dummy
"""Transpose but with numpy-like signature.
Example::
torch.transpose_axes(torch.randn(3,4,5,6), (2,3,1,0)) | shape() # returns (5,6,4,3)
""" # dummy
n = len(axes); d = {x:x for x in range(n)} # dummy
if len(axes) != len(it.shape): raise ValueError("axes don't match tensor") # dummy
for a in range(n): # dummy
b = d[axes[a]]; x = d[a]; y = d[b]; t = d[x]; d[x] = d[y]; d[y] = t # dummy
if a != b: it = torch.transpose(it, a, b) # dummy
return it # dummy
@k1lib.patch(np) # dummy
def loglinspace(a, b, n=100, **kwargs): # loglinspace
"""Like :meth:`torch.linspace`, but spread the values out in log space,
instead of linear space. Different from :meth:`torch.logspace`""" # loglinspace
return math.e**np.linspace(math.log(a), math.log(b), n, **kwargs) # loglinspace
try: # loglinspace
import graphviz # loglinspace
@k1lib.patch(graphviz.Digraph, "__call__") # loglinspace
@k1lib.patch(graphviz.Graph, "__call__") # loglinspace
def _call(self, _from, *tos, **kwargs): # loglinspace
"""Convenience method to quickly construct graphs.
Example::
g = k1lib.graph()
g("a", "b", "c")
g # displays arrows from "a" to "b" and "a" to "c"
""" # loglinspace
for to in tos: self.edge(_from, to, **kwargs) # loglinspace
except: pass # loglinspace
try: # loglinspace
import matplotlib.pyplot as plt # loglinspace
from mpl_toolkits.mplot3d import Axes3D, art3d # loglinspace
@k1lib.patch(Axes3D) # loglinspace
def march(self, heatMap, level:float=0, facecolor=[0.45, 0.45, 0.75], edgecolor=None): # loglinspace
"""Use marching cubes to plot surface of a 3d heat map.
Example::
plt.k3d(6).march(k1lib.perlin3d(), 0.17)
.. image:: images/march.png
A more tangible example::
t = torch.zeros(100, 100, 100)
t[20:30,20:30,20:30] = 1
t[80:90,20:30,40:50] = 1
plt.k3d().march(t.numpy())
The function name is "march" because how it works internally is by using
something called marching cubes.
:param heatMap: 3d numpy array
:param level: array value to form the surface on""" # loglinspace
from skimage import measure # loglinspace
try: verts, faces, normals, values = measure.marching_cubes(heatMap, level) # loglinspace
except: verts, faces, normals, values = measure.marching_cubes_lewiner(heatMap, level) # loglinspace
mesh = art3d.Poly3DCollection(verts[faces]) # loglinspace
if facecolor is not None: mesh.set_facecolor(facecolor) # loglinspace
if edgecolor is not None: mesh.set_edgecolor(edgecolor) # loglinspace
self.add_collection3d(mesh) # loglinspace
self.set_xlim(0, heatMap.shape[0]) # loglinspace
self.set_ylim(0, heatMap.shape[1]) # loglinspace
self.set_zlim(0, heatMap.shape[2]); return self # loglinspace
@k1lib.patch(Axes3D) # loglinspace
def aspect(self): # loglinspace
"""Use the same aspect ratio for all axes.""" # loglinspace
self.set_box_aspect([ub - lb for lb, ub in (getattr(self, f'get_{a}lim')() for a in 'xyz')]) # loglinspace
@k1lib.patch(plt) # loglinspace
def k3d(size=8, labels=True, *args, **kwargs): # loglinspace
"""Convenience function to get an :class:`~mpl_toolkits.mplot3d.axes3d.Axes3D`.
:param labels: whether to include xyz labels or not
:param size: figure size""" # loglinspace
if isinstance(size, (int, float)): size = (size, size) # loglinspace
fig = plt.figure(figsize=size, constrained_layout=True, *args, **kwargs) # loglinspace
ax = fig.add_subplot(projection="3d") # loglinspace
if labels: # loglinspace
ax.set_xlabel('x') # loglinspace
ax.set_ylabel('y') # loglinspace
ax.set_zlabel('z') # loglinspace
return ax # loglinspace
@k1lib.patch(plt) # loglinspace
def animate(azimSpeed=3, azimStart=0, elevSpeed=0.9, elevStart=0, frames=20, close=True): # loglinspace
"""Animates the existing 3d axes.
Example::
plt.k3d().scatter(*np.random.randn(3, 10))
plt.animate()
:param frames: how many frames to render? Frame rate is 30 fps
:param close: whether to close the figure (to prevent the animation and
static plot showing at the same time) or not""" # loglinspace
fig = plt.gcf() # loglinspace
def f(frame): # loglinspace
for ax in fig.axes: # loglinspace
ax.view_init(elevStart+frame*elevSpeed, azimStart+frame*azimSpeed) # loglinspace
if close: plt.close() # loglinspace
return k1lib.viz.FAnim(fig, f, frames) # loglinspace
@k1lib.patch(plt) # loglinspace
def getFig(): # loglinspace
"""Grab figure of the current plot.
Example::
plt.plot() | plt.getFig() | toImg()
Internally, this just calls ``plt.gcf()`` and that's it, pretty simple.
But I usually plot things as a part of the cli pipeline, and it's very
annoying that I can't quite chain ``plt.gcf()`` operation, so I created
this
This has an alias called ``plt.toFig()``
""" # loglinspace
return cli.aS(lambda _: plt.gcf()) # loglinspace
@k1lib.patch(plt) # loglinspace
def toFig(): return cli.aS(lambda _: plt.gcf()) # loglinspace
k1lib._settings.monkey.add("capturePlt", False, "whether to intercept matplotlib's show() and turn it into an image or not") # loglinspace
_oldShow = plt.show; _recentImg = [None] # loglinspace
@k1lib.patch(plt) # loglinspace
def show(*args, **kwargs): # loglinspace
try: # loglinspace
if k1lib._settings.monkey.capturePlt: _recentImg[0] = plt.gcf() | k1lib.cli.toImg() # loglinspace
except: return _oldShow(*args, **kwargs) # loglinspace
@k1lib.patch(plt) # loglinspace
def _k1_capturedImg(): return _recentImg[0] # loglinspace
except: pass # loglinspace
try: # loglinspace
@k1lib.patch(Axes3D) # loglinspace
def plane(self, origin, v1, v2=None, s1:float=1, s2:float=1, **kwargs): # loglinspace
"""Plots a 3d plane.
:param origin: origin vector, shape (3,)
:param v1: 1st vector, shape (3,)
:param v2: optional 2nd vector, shape(3,). If specified, plots a plane created
by 2 vectors. If not, plots a plane perpendicular to the 1st vector
:param s1: optional, how much to scale 1st vector by
:param s2: optional, how much to scale 2nd vector by
:param kwargs: keyword arguments passed to :meth:`~mpl_toolkits.mplot3d.axes3d.Axes3D.plot_surface`""" # loglinspace
v1 = (v1 if isinstance(v1, torch.Tensor) else torch.tensor(v1)).float() # loglinspace
if v2 is None: # loglinspace
v = v1 # loglinspace
v1 = torch.tensor([1.0, 1, -(v[0]+v[1])/v[2]]) # loglinspace
v2 = torch.cross(v, v1) # loglinspace
v2 = (v2 if isinstance(v2, torch.Tensor) else torch.tensor(v2)).float() # loglinspace
origin = (origin if isinstance(origin, torch.Tensor) else torch.tensor(origin)).float() # loglinspace
x = torch.linspace(-1, 1, 50)[:,None] # loglinspace
v1 = (v1[None,:]*x*s1)[:,None] # loglinspace
v2 = (v2[None,:]*x*s2)[None,:] # loglinspace
origin = origin[None,None,:] # loglinspace
plane = (origin + v1 + v2).permute(2, 0, 1) # loglinspace
self.plot_surface(*plane.numpy(), **kwargs) # loglinspace
except: pass # loglinspace
try: # loglinspace
@k1lib.patch(Axes3D) # loglinspace
def point(self, v, **kwargs): # loglinspace
"""Plots a 3d point.
:param v: point location, shape (3,)
:param kwargs: keyword argument passed to :meth:`~mpl_toolkits.mplot3d.axes3d.Axes3D.scatter`""" # loglinspace
v = (v if hasTorch and isinstance(v, torch.Tensor) else torch.tensor(v)).float() # loglinspace
self.scatter(*v, **kwargs) # loglinspace
@k1lib.patch(Axes3D) # loglinspace
def line(self, v1, v2, **kwargs): # loglinspace
"""Plots a 3d line.
:param v1: 1st point location, shape (3,)
:param v2: 2nd point location, shape (3,)
:param kwargs: keyword argument passed to :meth:`~mpl_toolkits.mplot3d.axes3d.Axes3D.plot`""" # loglinspace
self.plot(*torch.tensor([list(v1), list(v2)]).float().T, **kwargs) # loglinspace
except: pass # loglinspace
try: # loglinspace
@k1lib.patch(Axes3D) # loglinspace
def surface(self, z, **kwargs): # loglinspace
"""Plots 2d surface in 3d. Pretty much exactly the same as
:meth:`~mpl_toolkits.mplot3d.axes3d.Axes3D.plot_surface`, but fields x and y
are filled in automatically. Example::
x, y = np.meshgrid(np.linspace(-2, 2), np.linspace(-2, 2))
plt.k3d(6).surface(x**3 + y**3)
.. image:: images/surface.png
:param z: 2d numpy array for the heights
:param kwargs: keyword arguments passed to ``plot_surface``""" # loglinspace
if hasTorch and isinstance(z, torch.Tensor): z = z.numpy() # loglinspace
x, y = z.shape # loglinspace
x, y = np.meshgrid(np.array(range(y)), np.array(range(x))) # loglinspace
return self.plot_surface(x, y, z, **kwargs) # loglinspace
except: pass # loglinspace
try: # loglinspace
import matplotlib as mpl, matplotlib.cm as cm, json # loglinspace
@k1lib.patch(mpl.colors.Colormap) # loglinspace
def __hash__(self): return hash(self.name) # loglinspace
def _generate_jsF_cm(cm1): # loglinspace
xs = np.linspace(0, 1, 100); data = cli.deref(igT=False)(cm1(xs)); data = [*data, data[-1], data[-1]] | cli.aS(json.dumps) # loglinspace
def _jsF_cm(meta): # loglinspace
fIdx = cli.init._jsFAuto(); dataIdx = cli.init._jsDAuto(); ctxIdx = f"{cli.init._jsDAuto()}_{round(time.time()*1000)}" # loglinspace
return f"""//k1_moveOutStart\n{ctxIdx} = {data};\n//k1_moveOutEnd
{fIdx} = ({dataIdx}) => {{
const isNumber = typeof({dataIdx}) === "number";
if (isNumber) {dataIdx} = [{dataIdx}];
const ans = [];
for (let x of {dataIdx}) {{
x = Math.max(Math.min(x, 1), 0);
const i = Math.floor(x*100); const b = x*100-i; const a = 1-b;
const c1 = {ctxIdx}[i]; const c2 = {ctxIdx}[i+1];
ans.push([c1[0]*a+c2[0]*b, c1[1]*a+c2[1]*b, c1[2]*a+c2[2]*b, c1[3]*a+c2[3]*b]);
}}
return isNumber ? ans[0] : ans;
}}""", fIdx # loglinspace
return _jsF_cm # loglinspace
try: # loglinspace
for x in [x for x in cm.__dict__.values() if isinstance(x, mpl.colors.Colormap)]: # loglinspace
try: k1lib.settings.cli.kjs.jsF[x] = _generate_jsF_cm(x) # loglinspace
except: pass # loglinspace
except: pass # loglinspace
except: pass # loglinspace
geopandas = k1lib.dep("geopandas", url="https://geopandas.org/") # loglinspace
try: # loglinspace
import matplotlib.pyplot as plt # loglinspace
@k1lib.patch(plt) # loglinspace
def worldmap(**kwargs): # loglinspace
"""Plots a world map on the current axes.
Example::
plt.figure(figsize=(16, 10))
plt.worldmap()
plt.plot((-118.2611, -83.2162, -97.822), (34.074, 42.3575, 37.751), "o")
""" # loglinspace
with k1lib.ignoreWarnings(): # loglinspace
geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres")).plot(**{"color": "lightgrey", "ax": plt.gca(), **kwargs}) # loglinspace
except: pass # loglinspace
try: # loglinspace
import pandas as pd # loglinspace
@k1lib.patch(pd.DataFrame) # loglinspace
def table(self): # loglinspace
"""Converts a :class:`pandas.core.frame.DataFrame` to a normal table made
from lists (with column headers), so that it can be more easily manipulated
with cli tools. Example::
pd.read_csv("abc.csv").table()""" # loglinspace
yield self.columns.to_list() # loglinspace
yield from self.values # loglinspace
@k1lib.patch(pd.DataFrame) # loglinspace
def newColName(self, N=1, prefix="col_"): # loglinspace
"""Creates a new column name that hasn't existed before
Example::
df = pd.DataFrame(...)
df.newColName(N=1) # likely returns "col_1"
df.newColName(N=3) # likely returns ["col_1", "col_2", "col_3"]
df.newColName(N=None) # returns an iterator that generates infinitely many names
df.newColName(N=float("inf")) # same as above
:param N: whether to generate a single name or multiple names
:param prefix: prefix of the generated names""" # loglinspace
autoIdx = k1lib.AutoIncrement(prefix=prefix); s = set(list(self)) # loglinspace
if N == 1: # loglinspace
while True: # loglinspace
e = autoIdx() # loglinspace
if e not in s: return e # loglinspace
elif N is not None and N < float("inf"): # loglinspace
ans = []; count = 0 # loglinspace
while True: # loglinspace
e = autoIdx() # loglinspace
if e not in s: ans.append(e); count += 1 # loglinspace
if count >= N: return ans # loglinspace
else: # loglinspace
def gen(): # loglinspace
while True: # loglinspace
e = autoIdx() # loglinspace
if e not in s: yield e # loglinspace
return gen() # loglinspace
@k1lib.patch(pd.DataFrame) # loglinspace
def replaceCol(self, name, col): # loglinspace
"""Replaces a specific column with a different specified column.
Example::
df = pd.DataFrame({"A": [0, 1, 2], "B": [3, 4, 5]})
df.replaceCol("A", ["a", "b", "c"])
""" # loglinspace
cols = [self[c] for c in list(self)]; cIdx = [i for i,c in enumerate(list(self)) if c == name][0] # loglinspace
ogName = cols[cIdx].name; cols[cIdx] = col; nameGen = self.newColName(None) # loglinspace
return pd.DataFrame({getattr(c, "name", ogName if i == cIdx else next(nameGen)):c for i,c in enumerate(cols)}) # loglinspace
except: pass # loglinspace
try: # loglinspace
import forbiddenfruit # loglinspace
def splitCamel(s): # loglinspace
"""Splits a string up based on camel case.
Example::
# returns ['IHave', 'No', 'Idea', 'What', 'To', 'Put', 'Here']
"IHaveNoIdeaWhatToPutHere".splitCamel()""" # loglinspace
words = [[s[0]]] # loglinspace
for c in s[1:]: # loglinspace
if words[-1][-1].islower() and c.isupper(): # loglinspace
words.append(list(c)) # loglinspace
else: words[-1].append(c) # loglinspace
return [''.join(word) for word in words] # loglinspace
forbiddenfruit.curse(str, "splitCamel", splitCamel) # loglinspace
except: pass # loglinspace
try: # loglinspace
import ray # loglinspace
@ray.remote # loglinspace
class RayProgress: # loglinspace
def __init__(self, n): self.values = [0]*n; self.thStop = False # loglinspace
def update(self, idx:int, val:float): self.values[idx] = val; return self.values[idx] # loglinspace
def stop(self): self.thStop = True # loglinspace
def content(self): return self.thStop, self.values | cli.apply(lambda x: f"{round(x*100)}%") | cli.join(" | ") # loglinspace
def startRayProgressThread(rp, title:str="Progress"): # loglinspace
def inner(x): # loglinspace
if x == 0: return # loglinspace
print("Starting...\r", end="") # loglinspace
beginTime = time.time() # loglinspace
while True: # loglinspace
stop, content = ray.get(rp.content.remote()) # loglinspace
print(f"{title}: {content}, {round(time.time()-beginTime)}s elapsed \r", end="") # loglinspace
if stop: break # loglinspace
time.sleep(0.01) # loglinspace
[0, 1] | cli.applyTh(inner, timeout=1e9, prefetch=10) | cli.item() # loglinspace
@k1lib.patch(ray) # loglinspace
@contextmanager # loglinspace
def progress(n:int, title:str="Progress"): # loglinspace
"""Manages multiple progress bars distributedly.
Example::
with ray.progress(5) as rp:
def process(idx:int):
for i in range(100):
time.sleep(0.05) # do some processing
rp.update.remote(idx, (i+1)/100) # update progress. Expect number between 0 and 1
range(5) | applyCl(process) | deref() # execute function in multiple nodes
This will print out a progress bar that looks like this::
Progress: 100% | 100% | 100% | 100% | 100%
:param n: number of progresses to keep track of
:param title: title of the progress to show""" # loglinspace
rp = RayProgress.remote(n); startRayProgressThread(rp, title); yield rp # loglinspace
ray.get(rp.stop.remote()); time.sleep(0.1) # loglinspace
except: pass # loglinspace
@k1lib.patch(np) # loglinspace
def gather(self, dim, index): # gather
"""Gathers values along an axis specified by ``dim``.
For a 3-D tensor the output is specified by::
out[i][j][k] = input[index[i][j][k]][j][k] # if dim == 0
out[i][j][k] = input[i][index[i][j][k]][k] # if dim == 1
out[i][j][k] = input[i][j][index[i][j][k]] # if dim == 2
Not my code. All credits go to https://stackoverflow.com/questions/46868056/how-to-gather-elements-of-specific-indices-in-numpy
:param dim: the axis along which to index
:param index: A tensor of indices of elements to gather""" # gather
idx_xsection_shape = index.shape[:dim] + index.shape[dim + 1:] # gather
self_xsection_shape = self.shape[:dim] + self.shape[dim + 1:] # gather
if idx_xsection_shape != self_xsection_shape: raise ValueError("Except for dimension " + str(dim) + ", all dimensions of index and self should be the same size") # gather
if index.dtype != np.dtype('int_'): raise TypeError("The values of index must be integers") # gather
data_swaped = np.swapaxes(self, 0, dim); index_swaped = np.swapaxes(index, 0, dim) # gather
gathered = np.choose(index_swaped, data_swaped); return np.swapaxes(gathered, 0, dim) # gather
try: # gather
import forbiddenfruit # gather
def expand(self, sh): return np.broadcast_to(self, sh) # gather
forbiddenfruit.curse(np.ndarray, "expand", expand) # gather
except: pass # gather
@k1lib.patch(os) # gather
def netstat(): # netstat
"""Runs netstat command and splits it up into a nice table.
Example::
os.netstat() # returns [["Proto", "Recv-Q", "Send-Q", "Local Address", ...], [...], ...]
""" # netstat
cli = k1lib.cli; return None | cli.cmd("netstat -lunpt", mode=0) | cli.item() | ~cli.head(1) | cli.unpretty(headers=["Proto", "Recv", "Send-Q", "Local Address", "Foreign", "State", "PID"]) | cli.op().strip().all(2) | cli.deref() # netstat
_portScan = {"lastScan": 0, "data": None} # netstat
@k1lib.patch(os) # netstat
def killPort(port:int, force:bool=False, allowMulti:bool=False): # killPort
"""Kills the process that is listening on a particular port.
Example::
os.killPort(8888) # kill jupyterlab process, if it's running on this port
:param force: if True, will send SIGKILL, else send SIGTERM
:param allowMulti: if True, allows multiple processes listening on the same port, else throws an error when that happens""" # killPort
cli = k1lib.cli # killPort
if time.time() - _portScan["lastScan"] > 5: # killPort
_portScan["data"] = os.netstat() | ~cli.head(1) | cli.cut(3, 6) | cli.apply(cli.op().split(":")[-1], 0) | cli.apply(cli.op().split("/")[0], 1) | cli.toInt(0, 1) | cli.apply(tuple) | cli.unique() | cli.groupBy(0, True) | cli.apply(cli.joinSt(), 1) | cli.deref() | cli.toDict() # killPort
_portScan["lastScan"] = time.time() # killPort
if port not in _portScan["data"]: raise Exception(f"No process detected to be listening on port {port}") # killPort
pids = _portScan["data"][port] # killPort
if len(pids) > 1 and not allowMulti: raise Exception(f"Port {port} has {len(pids)} processes. Situation might be more complex than you think, so aborting killing. Set .allowMulti to True if you really want to do this") # killPort
sig = signal.SIGKILL if force else signal.SIGTERM # killPort
for pid in pids: os.kill(pid, sig) # killPort
try: # these just try to prevent giant images in .data fields from hanging up JupyterLab's contextual help # killPort
import rosbag; bagMsg_oldRepr = rosbag.bag.BagMessage.__repr__ # killPort
@k1lib.patch(rosbag.bag.BagMessage) # killPort
def __repr__(self): # killPort
s = bagMsg_oldRepr(self).split("\n"); ss = [] # killPort
if hasattr(self.message, "data") and isinstance(self.message.data, bytes): # killPort
for line in s: ss.append(f"data: ({len(self.message.data)} length), {self.message.data[:100]}..." if line.startswith("data: [") else line) # killPort
return "\n".join(ss) # killPort
return "\n".join(s) # killPort
@k1lib.patch(rosbag.bag.BagMessage) # killPort
def __str__(self): return self.__repr__() # killPort
except: pass # killPort
try: # killPort
import genpy; genpyMsg_oldRepr = genpy.message.Message.__repr__ # killPort
@k1lib.patch(genpy.message.Message) # killPort
def __repr__(self): # killPort
s = genpyMsg_oldRepr(self).split("\n"); ss = [] # killPort
if hasattr(self, "data") and isinstance(self.data, bytes): # killPort
for line in s: ss.append(f"data: ({len(self.data)} length), {self.data[:100]}..." if line.startswith("data: [") else line) # killPort
return "\n".join(ss) # killPort
return "\n".join(s) # killPort
@k1lib.patch(genpy.message.Message) # killPort
def __str__(self): return self.__repr__() # killPort
except: pass # killPort
def patchPandas(): # patchPandas
"""Patches panda's :class:`pandas.core.series.Series` and
:class:`pandas.core.frame.DataFrame` so that piping works::
pd.read_csv("a.csv")["col3"] | shape()""" # patchPandas
try: # patchPandas
import pandas as pd # patchPandas
except: return # patchPandas
try: # patchPandas
if pd._k1_patched: return # previously when used with forbiddenfruit, apparently its bad for this function to run again, so have a sentinel/guard here to check. Not sure if this is still needed # patchPandas
except: pass # patchPandas
def patch(klass): # patchPandas
oldPdOr = klass.__or__ # patchPandas
def newPdOr(self, v): # patchPandas
if isinstance(v, cli.BaseCli): return NotImplemented # patchPandas
try: return oldPdOr(self, v) # patchPandas
except: warnings.warn(traceback.format_exc()) # patchPandas
klass.__or__ = newPdOr # forbiddenfruit.curse(klass, "__or__", newPdOr) # turns out forbiddenfruit is not needed! # patchPandas
return oldPdOr, newPdOr # patchPandas
try: # patchPandas
patch(pd.core.arraylike.OpsMixin) # patchPandas
pd._k1_patched = True # patchPandas
except Exception as e: warnings.warn(f"Tried to patch __or__ operator of type `pd.core.arraylike.OpsMixin` but can't because: {e}") # patchPandas
patchPandas() # patchPandas
try: # patchPandas
import html, base64 # patchPandas
@k1lib.patch(html) # patchPandas
def b64escape(s): # patchPandas
"""Escape the html by converting the whole thing to base64, then return a string with "atob" function that decodes that mess""" # patchPandas
return f"atob('{base64.b64encode(s.encode()).decode()}')" # patchPandas
except: pass # patchPandas