import k1lib
from k1lib.cli import *
__all__ = ["trace"]
traceIdxAuto = k1lib.AutoIncrement(prefix="TD_")
class TraceData: # TraceData
def __init__(self, _cli, inS, outS, name=None): # TraceData
:param inS: in and out strings to be displayed in the edges""" # TraceData
self.idx = traceIdxAuto() # TraceData
self.inS = inS; self.outS = outS; self.cli = _cli # TraceData
self._name = name or _cli.__class__.__name__ # TraceData
@property # TraceData
def name(self): # TraceData
hint = "" # TraceData
try: hint = f" {self.cli._outHint}" # TraceData
except: pass # TraceData
return self._name + hint # TraceData
def __str__(self): # TraceData
return f"<TraceData idx='{self.idx}' inS='{self.inS}' outS='{self.outS}' name='{self.name}' cli='{self.cli}'>" # TraceData
def isMTM(c): # isMTM
if not isinstance(c, BaseCli): return False # isMTM
if isinstance(c, (applyMp, applyTh)): return True # isMTM
if isinstance(c, apply) and isinstance(c.f, BaseCli) and c.column is None: return True # isMTM
return False # isMTM
class TraceException(Exception): pass # TraceException
clusterAuto = k1lib.AutoIncrement() # TraceException
emptyInputSentinel = object() # TraceException
class _trace(BaseCli): # _trace
def __init__(self, inp, f, g=None, depth=None): # _trace
Some notes. startTd will always tries to grab the first thing, lastTd will only
grab the last thing at the end of __ror__, hence "last" and not "end".
:param inp: initial input to pipe into other cli tools
:param f: function to display result of cli tools, default just shows the shape of the stream
:param env: :class:`graphviz.dot.Digraph` to use (hence subgraph, hence no "start" and "end")""" # _trace
if depth is None: depth = k1lib.MaxDepth(float("inf"), 0) # _trace
self.inp = inp # will change constantly as new clis are being piped into by trace # _trace
self.f = f; self.depth = depth; self._reprRO = k1lib.RunOnce() # _trace
if g is None: # _trace
self.lastTd = TraceData(None, None, None, "\\<start\\>") # _trace
self.g = k1lib.digraph(); self._formNode(self.lastTd) # _trace
else: self.g = g; self.lastTd = None # _trace
self.firstTime = True # every other time other than the first should not record any data. It should just pass data through # _trace
def _typehint(self, inp): return inp # _trace
def _formNode(self, td:TraceData, g=None): (g or self.g).node(td.idx, td.name) # _trace
def _formEdge(self, td1:TraceData, td2:TraceData, g=None, label:str=None): # _trace
if td1 is None or td2 is None: return # _trace
(g or self.g).edge(td1.idx, td2.idx, label=f" {label or td2.inS or td1.outS}") # _trace
def _run(self, c, inp, cliName=None): # _trace
"""Takes in cli tool and input, runs it, and get trace data and output""" # _trace
if isinstance(c, op): c.ab_solidify() # _trace
out = c(inp) | deref() # why not "inp | c"? Cause we want to serve plain old functions inside apply too # _trace
return TraceData(c, f"{self.f(inp)}", f"{self.f(out)}", cliName), out # _trace
def __repr__(self): # _trace
try: from IPython.core import display as dis # _trace
except: raise RuntimeError("You have to install IPython/execute in a notebook first!") # _trace
if not self._reprRO(): # _trace
td = TraceData(None, self.lastTd.outS, None, "\\<end\\>") # _trace
self._formNode(td); self._formEdge(self.lastTd, td) # _trace
try: svg = self.g._repr_svg_() # _trace
except: # _trace
try: svg = self.g._repr_image_svg_xml() # _trace
except: pass # _trace
dis.display(dis.SVG(k1lib.scaleSvg(svg))); return "<trace object>" # _trace
def __ror__(self, it): # _trace
"""Alternative way to specify input.""" # _trace
#if self.inp != emptyInputSentinel: raise TraceException("Input to trace has already been set, but it's being set again (possibly due to `.all()`). Check last trace using ``trace.last``") # _trace
if self.inp != emptyInputSentinel: self.firstTime = False # _trace
self.inp = it | deref(); return self # _trace
def __iter__(self): return iter(self.inp) # _trace
def starStr(c): # starStr
try: return f"* {c._outHint}" # starStr
except: return "*" # starStr
@k1lib.patch(_trace) # starStr
def __or__(self, c): # __or__
if self.inp is emptyInputSentinel: # __or__
# do this to separate out potentially a serial right after this, so that trace() is actually in control, and not merge with the outside serial # __or__
if isinstance(c, serial): return serial(self, c) # __or__
return super(_trace, self).__or__(c) # __or__
if not isinstance(c, BaseCli): return NotImplemented # __or__
if self._reprRO.value: raise RuntimeError("Can't pipe this trace() into another cli tool, as it is used! Make a new trace instead.") # __or__
td, out = self._run(c, self.inp) # runs through the entire thing, then decides whether to go into the details or not # __or__
if not self.firstTime: return out # __or__
if not hasattr(self, "startTd"): self.startTd = td; startTdSet = True # __or__
else: startTdSet = False # whether startTd is set lately # __or__
def bypass(): # default connection case, don't go into clis and explore # __or__
self._formNode(td); self._formEdge(self.lastTd, td); self.lastTd = td # __or__
if self.depth and isinstance(c, serial): # __or__
with self.g.subgraph(name=f"cluster_{clusterAuto()}") as subG: # __or__
subG.attr(label="|, serial") # __or__
t = _trace(self.inp, self.f, subG, self.depth.enter()) # __or__
for _c in c.clis: t = t | _c # __or__
self._formEdge(self.lastTd, t.startTd); self.lastTd = t.lastTd # __or__
if startTdSet: self.startTd = t.startTd # __or__
elif self.depth and isMTM(c): # __or__
if isinstance(c, (apply, applyMp)): _c = c.f # __or__
try: singleInp = self.inp | item() # __or__
except StopIteration: bypass() # no items at all, can't trace! # __or__
else: # __or__
with self.g.subgraph(name=f"cluster_{clusterAuto()}") as subG: # __or__
subG.attr(label=".all(), apply") # __or__
t = _trace(self.inp | item(), self.f, subG, self.depth.enter()) # __or__
o1Td = TraceData(None, self.f(self.inp), None, "*"); self._formNode(o1Td, g=subG); t = t | _c # __or__
o2Td = TraceData(None, self.f(t.inp), None, starStr(c)); self._formNode(o2Td, g=subG) # __or__
t._formEdge(o1Td, t.startTd); t._formEdge(t.lastTd, o2Td); o2Td.outS = self.f(out) # __or__
self._formEdge(self.lastTd, o1Td); self.lastTd = o2Td # __or__
if startTdSet: self.startTd = o1Td # __or__
elif self.depth and isinstance(c, oneToMany): # __or__
with self.g.subgraph(name=f"cluster_{clusterAuto()}") as subG: # __or__
subG.attr(label="&, oneToMany") # __or__
o1Td = TraceData(None, self.f(self.inp), None, "*"); self._formNode(o1Td, g=subG) # __or__
o2Td = TraceData(None, None, None, starStr(c)); self._formNode(o2Td, g=subG) # __or__
for _c in c.clis: # __or__
t = _trace(self.inp, self.f, subG, self.depth.enter()) | _c # __or__
self._formEdge(o1Td, t.startTd); self._formEdge(t.lastTd, o2Td) # __or__
self._formEdge(self.lastTd, o1Td); self.lastTd = o2Td; o2Td.outS = self.f(out) # __or__
if startTdSet: self.startTd = o1Td # __or__
elif self.depth and isinstance(c, mtmS): # __or__
with self.g.subgraph(name=f"cluster_{clusterAuto()}") as subG: # __or__
subG.attr(label="+, mtmS") # __or__
o1Td = TraceData(None, self.f(self.inp), None, "*"); self._formNode(o1Td, g=subG) # __or__
o2Td = TraceData(None, None, self.f(out), starStr(c)); self._formNode(o2Td, g=subG) # __or__
for _c, _it in zip(c.clis, self.inp): # __or__
t = _trace(_it, self.f, subG, self.depth.enter()) | _c # __or__
self._formEdge(o1Td, t.startTd); self._formEdge(t.lastTd, o2Td) # __or__
self._formEdge(self.lastTd, o1Td); self.lastTd = o2Td; o2Td.outS = self.f(out) # __or__
if startTdSet: self.startTd = o1Td # __or__
elif self.depth and isinstance(c, apply) and isinstance(c.f, BaseCli) and c.column is not None: # __or__
try: singleInp = self.inp | item() # __or__
except StopIteration: bypass() # __or__
else: # __or__
with self.g.subgraph(name=f"cluster_{clusterAuto()}") as subG: # __or__
subG.attr(label=f"apply (column: {c.column})") # __or__
singleInp = singleInp[c.column] # __or__
t = _trace(singleInp, self.f, subG, self.depth.enter()) | c.f # __or__
o1Td = TraceData(None, self.f(self.inp), None, "*"); self._formNode(o1Td, g=subG) # __or__
o2Td = TraceData(None, self.f(t.inp), None, "*"); self._formNode(o2Td, g=subG) # __or__
t._formEdge(o1Td, t.startTd); t._formEdge(t.lastTd, o2Td); o2Td.outS = self.f(out) # __or__
self._formEdge(self.lastTd, o1Td); self.lastTd = o2Td # __or__
if startTdSet: self.startTd = o1Td # __or__
elif self.depth and isinstance(c, filt) and isinstance(c.predicate, BaseCli): # __or__
try: singleInp = self.inp | item() # __or__
except StopIteration: bypass() # __or__
else: # __or__
with self.g.subgraph(name=f"cluster_{clusterAuto()}") as subG: # __or__
subG.attr(label=f"filt (column: {c.column})") # __or__
self._formNode(td, g=subG); self._formEdge(self.lastTd, td) # main filt node # __or__
if c.column is not None: singleInp = singleInp[c.column] # __or__
t = _trace(singleInp, self.f, subG, self.depth.enter()) | c.predicate # __or__
tdEndFilt = td; #tdEndFilt = TraceData(None, None, None, "*"); self._formNode(tdEndFilt, g=subG) # can switch between styles # __or__
self._formEdge(td, t.startTd); self._formEdge(t.lastTd, tdEndFilt, label=f"{t.lastTd.outS}") # __or__
self.lastTd = td # __or__
if startTdSet: self.startTd = td # __or__
else: bypass() # __or__
self.inp = out; return self # __or__
[docs]class trace(_trace): # trace
last = None # trace
"""Last instantiated trace object. Access this to view the previous (possibly nested) trace.""" # trace
[docs] def __init__(self, f=shape(), maxDepth=float("inf")): # trace
"""Traces out how the data stream is transformed through complex cli tools.
# returns [1, 4, 9, 16], normal command
range(1, 5) | apply(lambda x: x**2) | deref()
# traced command, will display how the shapes evolve through cli tools
range(1, 5) | trace() | apply(lambda x: x**2) | deref()
There're a lot more instructions and code examples over the tutorial section. Go check it out!
This also works well with :class:`~k1lib.cli.typehint.tOpt`, and will actually display
inferred type data in the graph::
range(5) | tOpt() | trace() | apply(op()**2)
:param f: function to display the data stream. Defaulted to :class:`~k1lib.cli.utils.shape`,
and to :class:`~k1lib.cli.utils.iden` if is None.""" # trace
f = f or iden() # trace
g = lambda x: f"{f(x)}".split("\n")[:2] | apply(lambda s: f"{s[:50]}..." if len(s) > 50 else s) | join("\n") # trace
super().__init__(emptyInputSentinel, g, depth=k1lib.MaxDepth(maxDepth)) # trace
trace.last = self # trace