Source code for k1lib.cli.optimizations

"""This is for optimizing the hell out of cli tools. Optimizations
that focus around a specific cli should be included close to their
definitions, so this is for optimizations that unusually span multiple
clis, and serve as examples of how to create optimization passes.

See over the `LLVM optimizer tutorial <llvm.html>`_ for more background."""
import k1lib.cli as cli
from k1lib.cli.typehint import *
from k1lib.cli.init import yieldT
from collections import defaultdict
import k1lib, platform

__all__ = ["dummy"]
[docs]def dummy(): # dummy """Does nothing. Only here so that you can read the source code""" # dummy
#tOpt.clearPasses(); tOpt.n = 10 # dummy def setHints(cs, ts, metadata): # setHints s = cs[0]; t = ts[0]; #print(f"hints {metadata} - {[c.__class__.__name__ for c in s.clis]}") # setHints for c in s.clis: c._inHint = t; t = c._typehint(t) or tAny(); c._outHint = t # setHints tOpt.addPass(setHints, [cli.serial], 14) # for adding extra hints to cli objects # setHints def oUnwrapSerial(cs, ts, metadata): # oUnwrapSerial if len(metadata["route"]) < 2: return None # oUnwrapSerial s = metadata["route"][-2] # oUnwrapSerial if s == "apply" or s == "mtmS" or s == "oneToMany": return None # oUnwrapSerial #print(f"unwrap serial {metadata} {[c.__class__.__name__ for c in cs[0].clis]}") # oUnwrapSerial return cs[0].clis # oUnwrapSerial tOpt.addPass(oUnwrapSerial , [cli.serial], 1) # for unwrapping serial # oUnwrapSerial tOpt.addPass(lambda cs, ts, _: [cs[1], cs[0]], [cli.toList, cli.head]) # for swapping heads around # oUnwrapSerial def stripSerial(c): # stripSerial if c is None: return None # stripSerial while isinstance(c, cli.serial) and len(c.clis) == 1: c = c.clis[0] # stripSerial return c # stripSerial def prepareSerial(c): # basically prepares input clis so that they can be recursively optimized by `apply` and `mtmS` # prepareSerial c = stripSerial(c); return cli.serial(c) if isinstance(c, cli.serial) else cli.serial(cli.serial(c)) # prepareSerial def oApply(cs, ts, metadata): # for going into apply # oApply a = cs[0]; t = ts[0]; #print(f"apply {metadata} {cs} {ts} {a.f.clis}") # oApply if a.column is None and isinstance(a.f, cli.BaseCli): # oApply metadata["route"].append("apply"); # oApply res = stripSerial(cli.typehint.serialOpt(prepareSerial(a.f), t.item(), metadata)) # oApply metadata["route"].pop(); #print(f"res: {res}") # oApply if res is not None: return [cli.apply(res)] # oApply return None # oApply tOpt.addPass(oApply, [cli.apply]) # oApply def oMtmS(cs, ts, metadata): # oMtmS m = cs[0]; n = len(m.clis); newClis = []; atLeastOnce = False # oMtmS ts = m._inpTypeHintExpand(ts[0]) # oMtmS metadata["route"].append("mtmS") # oMtmS for c, t in zip(m.clis, ts): # oMtmS res = stripSerial(cli.typehint.serialOpt(prepareSerial(c), t, metadata)) # oMtmS if res is not None: atLeastOnce = True # oMtmS newClis.append(res) # oMtmS metadata["route"].pop() # oMtmS if atLeastOnce: return [cli.mtmS(*newClis)] # oMtmS tOpt.addPass(oMtmS, [cli.mtmS]) # oMtmS def oOneToMany(cs, ts, metadata): # oOneToMany o = cs[0]; t = ts[0]; atLeastOnce = False; newClis = [] # oOneToMany metadata["route"].append("oneToMany") # oOneToMany for c in o.clis: # oOneToMany res = stripSerial(cli.typehint.serialOpt(prepareSerial(c), t, metadata)) # oOneToMany if res is not None: atLeastOnce = True # oOneToMany newClis.append(res) # oOneToMany metadata["route"].pop() # oOneToMany if atLeastOnce: return [cli.oneToMany(*newClis)] # oOneToMany tOpt.addPass(oOneToMany, [cli.oneToMany]) # oOneToMany def basics(): # basics tOpt.addPass(oApply, [cli.apply]) # basics tOpt.addPass(oMtmS, [cli.mtmS]) # basics tOpt.addPass(oOneToMany, [cli.oneToMany]) # basics def oFileLength(cs, ts, _): # oFileLength c, s = cs; # oFileLength if s.idx != 0: return None # oFileLength return [cli.aS(lambda fn: None | cli.cmd(f"wc -l {fn}") | cli.item() | cli.op().split(" ")[0].ab_int())] # oFileLength if platform.system() in ["Linux", "Darwin"]: tOpt.addPass(oFileLength, [, cli.shape], 9) # oFileLength