Source code for k1lib.cli.trace

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import k1lib
from k1lib.cli import *
__all__ = ["trace"]
traceIdxAuto = k1lib.AutoIncrement(prefix="TD_")
class TraceData:                                                                 # TraceData
    def __init__(self, _cli, inS, outS, name=None):                              # TraceData
        """
:param inS: in and out strings to be displayed in the edges"""                   # TraceData
        self.idx = traceIdxAuto()                                                # TraceData
        self.inS = inS; self.outS = outS; self.cli = _cli                        # TraceData
        self._name = name or _cli.__class__.__name__                             # TraceData
    @property                                                                    # TraceData
    def name(self):                                                              # TraceData
        hint = ""                                                                # TraceData
        try: hint = f" {self.cli._outHint}"                                      # TraceData
        except: pass                                                             # TraceData
        return self._name + hint                                                 # TraceData
    def __str__(self):                                                           # TraceData
        return f"<TraceData idx='{self.idx}' inS='{self.inS}' outS='{self.outS}' name='{self.name}' cli='{self.cli}'>" # TraceData
def isMTM(c):                                                                    # isMTM
    if not isinstance(c, BaseCli): return False                                  # isMTM
    if isinstance(c, (applyMp, applyTh)): return True                            # isMTM
    if isinstance(c, apply) and isinstance(c.f, BaseCli) and c.column is None: return True # isMTM
    return False                                                                 # isMTM
class TraceException(Exception): pass                                            # TraceException
clusterAuto = k1lib.AutoIncrement()                                              # TraceException
emptyInputSentinel = object()                                                    # TraceException
class _trace(BaseCli):                                                           # _trace
    def __init__(self, inp, f, g=None, depth=None):                              # _trace
        """
Some notes. startTd will always tries to grab the first thing, lastTd will only
grab the last thing at the end of __ror__, hence "last" and not "end".

:param inp: initial input to pipe into other cli tools
:param f: function to display result of cli tools, default just shows the shape of the stream
:param env: :class:`graphviz.dot.Digraph` to use (hence subgraph, hence no "start" and "end")""" # _trace
        if depth is None: depth = k1lib.MaxDepth(float("inf"), 0)                # _trace
        self.inp = inp # will change constantly as new clis are being piped into by trace # _trace
        self.f = f; self.depth = depth; self._reprRO = k1lib.RunOnce()           # _trace
        if g is None:                                                            # _trace
            self.lastTd = TraceData(None, None, None, "\\<start\\>")             # _trace
            self.g = k1lib.digraph(); self._formNode(self.lastTd)                # _trace
        else: self.g = g; self.lastTd = None                                     # _trace
        self.firstTime = True # every other time other than the first should not record any data. It should just pass data through # _trace
    def _typehint(self, inp): return inp                                         # _trace
    def _formNode(self, td:TraceData, g=None): (g or self.g).node(td.idx, td.name) # _trace
    def _formEdge(self, td1:TraceData, td2:TraceData, g=None, label:str=None):   # _trace
        if td1 is None or td2 is None: return                                    # _trace
        (g or self.g).edge(td1.idx, td2.idx, label=f" {label or td2.inS or td1.outS}") # _trace
    def _run(self, c, inp, cliName=None):                                        # _trace
        """Takes in cli tool and input, runs it, and get trace data and output""" # _trace
        if isinstance(c, op): c.ab_solidify()                                    # _trace
        out = c(inp) | deref() # why not "inp | c"? Cause we want to serve plain old functions inside apply too # _trace
        return TraceData(c, f"{self.f(inp)}", f"{self.f(out)}", cliName), out    # _trace
    def __repr__(self):                                                          # _trace
        try: from IPython.core import display as dis                             # _trace
        except: raise RuntimeError("You have to install IPython/execute in a notebook first!") # _trace
        if not self._reprRO():                                                   # _trace
            td = TraceData(None, self.lastTd.outS, None, "\\<end\\>")            # _trace
            self._formNode(td); self._formEdge(self.lastTd, td)                  # _trace
        try: svg = self.g._repr_svg_()                                           # _trace
        except:                                                                  # _trace
            try: svg = self.g._repr_image_svg_xml()                              # _trace
            except: pass                                                         # _trace
        dis.display(dis.SVG(k1lib.scaleSvg(svg))); return "<trace object>"       # _trace
    def __ror__(self, it):                                                       # _trace
        """Alternative way to specify input."""                                  # _trace
        #if self.inp != emptyInputSentinel: raise TraceException("Input to trace has already been set, but it's being set again (possibly due to `.all()`). Check last trace using ``trace.last``") # _trace
        if self.inp != emptyInputSentinel: self.firstTime = False                # _trace
        self.inp = it | deref(); return self                                     # _trace
    def __iter__(self): return iter(self.inp)                                    # _trace
def starStr(c):                                                                  # starStr
    try: return f"* {c._outHint}"                                                # starStr
    except: return "*"                                                           # starStr
@k1lib.patch(_trace)                                                             # starStr
def __or__(self, c):                                                             # __or__
    if self.inp is emptyInputSentinel:                                           # __or__
        # do this to separate out potentially a serial right after this, so that trace() is actually in control, and not merge with the outside serial # __or__
        if isinstance(c, serial): return serial(self, c)                         # __or__
        return super(_trace, self).__or__(c)                                     # __or__
    if not isinstance(c, BaseCli): return NotImplemented                         # __or__
    if self._reprRO.value: raise RuntimeError("Can't pipe this trace() into another cli tool, as it is used! Make a new trace instead.") # __or__
    td, out = self._run(c, self.inp) # runs through the entire thing, then decides whether to go into the details or not # __or__
    if not self.firstTime: return out                                            # __or__
    if not hasattr(self, "startTd"): self.startTd = td; startTdSet = True        # __or__
    else: startTdSet = False # whether startTd is set lately                     # __or__
    def bypass(): # default connection case, don't go into clis and explore      # __or__
        self._formNode(td); self._formEdge(self.lastTd, td); self.lastTd = td    # __or__
    if self.depth and isinstance(c, serial):                                     # __or__
        with self.g.subgraph(name=f"cluster_{clusterAuto()}") as subG:           # __or__
            subG.attr(label="|, serial")                                         # __or__
            t = _trace(self.inp, self.f, subG, self.depth.enter())               # __or__
            for _c in c.clis: t = t | _c                                         # __or__
        self._formEdge(self.lastTd, t.startTd); self.lastTd = t.lastTd           # __or__
        if startTdSet: self.startTd = t.startTd                                  # __or__
    elif self.depth and isMTM(c):                                                # __or__
        if isinstance(c, (apply, applyMp)): _c = c.f                             # __or__
        try: singleInp = self.inp | item()                                       # __or__
        except StopIteration: bypass() # no items at all, can't trace!           # __or__
        else:                                                                    # __or__
            with self.g.subgraph(name=f"cluster_{clusterAuto()}") as subG:       # __or__
                subG.attr(label=".all(), apply")                                 # __or__
                t = _trace(self.inp | item(), self.f, subG, self.depth.enter())  # __or__
                o1Td = TraceData(None, self.f(self.inp), None, "*");        self._formNode(o1Td, g=subG); t = t | _c # __or__
                o2Td = TraceData(None, self.f(t.inp),    None, starStr(c)); self._formNode(o2Td, g=subG) # __or__
                t._formEdge(o1Td, t.startTd); t._formEdge(t.lastTd, o2Td); o2Td.outS = self.f(out) # __or__
            self._formEdge(self.lastTd, o1Td); self.lastTd = o2Td                # __or__
            if startTdSet: self.startTd = o1Td                                   # __or__
    elif self.depth and isinstance(c, oneToMany):                                # __or__
        with self.g.subgraph(name=f"cluster_{clusterAuto()}") as subG:           # __or__
            subG.attr(label="&, oneToMany")                                      # __or__
            o1Td = TraceData(None, self.f(self.inp), None, "*"); self._formNode(o1Td, g=subG) # __or__
            o2Td = TraceData(None, None, None,      starStr(c)); self._formNode(o2Td, g=subG) # __or__
            for _c in c.clis:                                                    # __or__
                t = _trace(self.inp, self.f, subG, self.depth.enter()) | _c      # __or__
                self._formEdge(o1Td, t.startTd); self._formEdge(t.lastTd, o2Td)  # __or__
        self._formEdge(self.lastTd, o1Td); self.lastTd = o2Td; o2Td.outS = self.f(out) # __or__
        if startTdSet: self.startTd = o1Td                                       # __or__
    elif self.depth and isinstance(c, mtmS):                                     # __or__
        with self.g.subgraph(name=f"cluster_{clusterAuto()}") as subG:           # __or__
            subG.attr(label="+, mtmS")                                           # __or__
            o1Td = TraceData(None, self.f(self.inp), None, "*");   self._formNode(o1Td, g=subG) # __or__
            o2Td = TraceData(None, None, self.f(out), starStr(c)); self._formNode(o2Td, g=subG) # __or__
            for _c, _it in zip(c.clis, self.inp):                                # __or__
                t = _trace(_it, self.f, subG, self.depth.enter()) | _c           # __or__
                self._formEdge(o1Td, t.startTd); self._formEdge(t.lastTd, o2Td)  # __or__
        self._formEdge(self.lastTd, o1Td); self.lastTd = o2Td; o2Td.outS = self.f(out) # __or__
        if startTdSet: self.startTd = o1Td                                       # __or__
    elif self.depth and isinstance(c, apply) and isinstance(c.f, BaseCli) and c.column is not None: # __or__
        try: singleInp = self.inp | item()                                       # __or__
        except StopIteration: bypass()                                           # __or__
        else:                                                                    # __or__
            with self.g.subgraph(name=f"cluster_{clusterAuto()}") as subG:       # __or__
                subG.attr(label=f"apply (column: {c.column})")                   # __or__
                singleInp = singleInp[c.column]                                  # __or__
                t = _trace(singleInp, self.f, subG, self.depth.enter()) | c.f    # __or__
                o1Td = TraceData(None, self.f(self.inp), None, "*"); self._formNode(o1Td, g=subG) # __or__
                o2Td = TraceData(None, self.f(t.inp),    None, "*"); self._formNode(o2Td, g=subG) # __or__
                t._formEdge(o1Td, t.startTd); t._formEdge(t.lastTd, o2Td); o2Td.outS = self.f(out) # __or__
            self._formEdge(self.lastTd, o1Td); self.lastTd = o2Td                # __or__
            if startTdSet: self.startTd = o1Td                                   # __or__
    elif self.depth and isinstance(c, filt) and isinstance(c.predicate, BaseCli): # __or__
        try: singleInp = self.inp | item()                                       # __or__
        except StopIteration: bypass()                                           # __or__
        else:                                                                    # __or__
            with self.g.subgraph(name=f"cluster_{clusterAuto()}") as subG:       # __or__
                subG.attr(label=f"filt (column: {c.column})")                    # __or__
                self._formNode(td, g=subG); self._formEdge(self.lastTd, td) # main filt node # __or__
                if c.column is not None: singleInp = singleInp[c.column]         # __or__
                t = _trace(singleInp, self.f, subG, self.depth.enter()) | c.predicate # __or__
                tdEndFilt = td; #tdEndFilt = TraceData(None, None, None, "*"); self._formNode(tdEndFilt, g=subG) # can switch between styles # __or__
                self._formEdge(td, t.startTd); self._formEdge(t.lastTd, tdEndFilt, label=f"{t.lastTd.outS}") # __or__
                self.lastTd = td                                                 # __or__
            if startTdSet: self.startTd = td                                     # __or__
    else: bypass()                                                               # __or__
    self.inp = out; return self                                                  # __or__
[docs]class trace(_trace): # trace last = None # trace """Last instantiated trace object. Access this to view the previous (possibly nested) trace.""" # trace
[docs] def __init__(self, f=shape(), maxDepth=float("inf")): # trace """Traces out how the data stream is transformed through complex cli tools. Example:: # returns [1, 4, 9, 16], normal command range(1, 5) | apply(lambda x: x**2) | deref() # traced command, will display how the shapes evolve through cli tools range(1, 5) | trace() | apply(lambda x: x**2) | deref() There're a lot more instructions and code examples over the tutorial section. Go check it out! This also works well with :class:`~k1lib.cli.typehint.tOpt`, and will actually display inferred type data in the graph:: range(5) | tOpt() | trace() | apply(op()**2) :param f: function to display the data stream. Defaulted to :class:`~k1lib.cli.utils.shape`, and to :class:`~k1lib.cli.utils.iden` if is None.""" # trace f = f or iden() # trace g = lambda x: f"{f(x)}".split("\n")[:2] | apply(lambda s: f"{s[:50]}..." if len(s) > 50 else s) | join("\n") # trace super().__init__(emptyInputSentinel, g, depth=k1lib.MaxDepth(maxDepth)) # trace trace.last = self # trace