# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
Lots of type hint mechanisms to be used by the `LLVM optimizer <llvm.html>`_
"""
import k1lib.cli as cli
import k1lib, itertools, copy, numbers; import numpy as np
from k1lib.cli.init import yieldT
from typing import List
from collections import defaultdict, deque
try: import torch; hasTorch = True
except: hasTorch = False; torch = k1lib.Object().withAutoDeclare(lambda: type("RandomClass", (object, ), {}))
__all__ = ["tBase", "tAny", "tList", "tIter", "tSet", "tCollection", "tExpand",
"tNpArray", "tTensor",
"tListIterSet", "tListSet", "tListIter", "tArrayTypes",
"inferType", "TypeHintException", "tLowest", "tCheck", "tOpt"]
settings = k1lib.settings.cli
settings.add("llvm", k1lib.Settings(), "settings related to LLVM-inspired optimizer `tOpt`. See more at module `k1lib.cli.typehint`")
settings.llvm.add("k1a", True, "utilize the supplementary C-compiled library automatically for optimizations")
[docs]class TypeHintException(Exception): pass # TypeHintException
def klassName(self): # klassName
if isinstance(self, tBase): return self.__class__.__name__ # klassName
try: return f"{self.__name__}" # klassName
except: return f"{self}" # klassName
def klassRepr(self): return f"{self}" if isinstance(self, tBase) else klassName(self) # klassRepr
[docs]class tBase: # tBase
def __init__(self, child=type(None)): self.child = child # tBase
def __repr__(self): # tBase
return f"<{klassName(self)} {klassRepr(self.child)}>" # tBase
def __eq__(self, v): # tBase
if not isinstance(v, tBase): return False # tBase
if self.__class__ != v.__class__: return False # tBase
if self.child != v.child: return False # tBase
return True # tBase
[docs] def check(self, v): # tBase
"""Checks whether a specific object adhears to this type hint or not.
Returns :attr:`~k1lib.cli.init.yieldT` if object does not adhere. If it does, then return the object.
Note that in the case that the object is actually an iterator, it will return
a new iterator containing all elements from the old iterator.""" # tBase
return NotImplemented # tBase
[docs] def item(self): # tBase
"""Gets the child type of this type. Basically what's the type if
it were to go through :class:`~k1lib.cli.utils.item`. Example::
# returns tTensor(torch.float32, 2)
tTensor(torch.float32, 3).item()
""" # tBase
return self.child if self.child is not type(None) else tAny() # tBase
[docs] def expand(self, n) -> List["tBase"]: # tBase
"""Expands the type to a list with ``n`` elements.
Example::
# returns [int, int, int, int]
tList(int).expand(4)
# returns [int, float, float, str]
tCollection(int, tExpand(float), str).expand(4)
""" # tBase
return [self.child if self.child is not type(None) else tAny()]*n # tBase
def __hash__(self): # tBase
return hash(f"{self.__class__} {self.child}") # tBase
def checkF(t): # checkF
#print(t, isinstance(t, (tBase, cli.typehint.tBase))) # checkF
if isinstance(t, (tBase, cli.typehint.tBase)): return t.check # checkF
else: # checkF
def inner(x): # checkF
try: return x if isinstance(x, t) else yieldT # checkF
except TypeError: # checkF
if hasTorch and isinstance(x, torch.Tensor): # checkF
return x if x.dtype == t else yieldT # checkF
return yieldT # checkF
except Exception as e: # checkF
print(x, t); raise e # checkF
return inner # checkF
[docs]class tAny(tBase): # tAny
def __init__(self): super().__init__() # tAny
def __repr__(self): return f"<{klassName(self)}>" # tAny
def __eq__(self, v): return isinstance(v, tAny) # tAny
[docs] def check(self, v): return v # tAny
[docs] def item(self): return tAny() # tAny
def __hash__(self): return hash(f"tAny") # tAny
[docs]class tIter(tBase): # tIter
[docs] def check(self, v): # tIter
l = [] # tIter
for e in v: # tIter
x = checkF(self.child)(e); l.append(x) # tIter
if x == yieldT: return yieldT # tIter
return iter(l) # tIter
[docs]class tList(tBase): # tList
[docs] def check(self, v): # tList
if not isinstance(v, (list, tuple, range)): return yieldT # tList
if tIter(self.child).check(v) is yieldT: return yieldT # tList
return v # tList
[docs]class tSet(tBase): # tSet
[docs] def check(self, v): # tSet
if not isinstance(v, set): return False # tSet
if tIter(self.child).check(v) is yieldT: return yieldT # tSet
return v # tSet
tListIterSet = (tList, tIter, tSet) # tSet
tListSet = (tList, tSet) # tSet
tListIter = (tList, tIter) # tSet
class tDict(tBase): # tDict
def __init__(self, keys, values): # tDict
"""Dictionary type.
Example::
d = tDict(tIter(str), tIter(int))
# returns {"a": 3} dict, so check passed
d.check({"a": 3})""" # tDict
super().__init__(); self.keys = keys; self.values = values # tDict
def check(self, v): # tDict
if not isinstance(v, dict): return yieldT # tDict
ks = self.keys.check(list(v.keys())) # tDict
vs = self.values.check(list(v.values())) # tDict
if ks is yieldT or vs is yieldT: return yieldT # tDict
return {k: v for k, v in zip(ks, vs)} # tDict
def __eq__(self, v): # tDict
if not isinstance(v, tDict): return False # tDict
if self.keys != v.keys: return False # tDict
if self.values != v.values: return False # tDict
return True # tDict
def __repr__(self): # tDict
return f"<{klassName(self)} {klassRepr(self.keys)} {klassRepr(self.values)}>" # tDict
[docs]class tNpArray(tBase): # tNpArray
[docs] def __init__(self, child=None, rank=None): # tNpArray
"""Numpy array type.
Example::
# returns np.array([2, 3])
tNpArray(np.int64, 1).check(np.array([2, 3]))
:param child: the dtype of the array
:param rank: the rank/dimension of the array""" # tNpArray
super().__init__(child); self.rank = rank # tNpArray
[docs] def check(self, v): # tNpArray
if not isinstance(v, np.ndarray): return yieldT # tNpArray
if self.rank is not None and self.rank != len(v.shape): return yieldT # tNpArray
return v # tNpArray
def __repr__(self): return f"<tNpArray {klassName(self.child)} rank={self.rank}>" # tNpArray
[docs] def item(self): return (tNpArray(self.child, self.rank - 1) if self.rank > 1 else self.child) if self.rank is not None else tNpArray(self.child, None) # tNpArray
def __eq__(self, v): # tNpArray
if not isinstance(v, tNpArray): return False # tNpArray
if self.child is not None and v.child is not None and self.child != v.child: return False # tNpArray
if self.rank is None or v.rank is None: return True # tNpArray
return self.rank == v.rank # tNpArray
def __hash__(self): return hash(f"{self.child} - {self.rank}") # tNpArray
[docs] def expand(self, n): return [self.item()]*n # tNpArray
if hasTorch: # tNpArray
class tTensor(tBase): # tNpArray
[docs] def __init__(self, child=None, rank=None): # tNpArray
"""PyTorch tensor type.
Example::
# returns torch.tensor([2.0, 3.0])
tTensor(torch.float32, 1).check(torch.tensor([2.0, 3.0]))
:param child: the dtype of the array
:param rank: the rank/dimension of the tensor""" # tNpArray
super().__init__(child); self.rank = rank # tNpArray
[docs] def check(self, v): # tNpArray
if not isinstance(v, torch.Tensor): return yieldT # tNpArray
if self.rank is not None and self.rank != len(v.shape): return yieldT # tNpArray
return v # tNpArray
def __repr__(self): return f"<tTensor {klassName(self.child)} rank={self.rank}>" # tNpArray
[docs] def item(self): return (tTensor(self.child, self.rank - 1) if self.rank > 1 else self.child) if self.rank is not None else tTensor(self.child, None) # tNpArray
def __eq__(self, v): # tNpArray
if not isinstance(v, tTensor): return False # tNpArray
if self.child is not None and v.child is not None and self.child != v.child: return False # tNpArray
if self.rank is None or v.rank is None: return True # tNpArray
return self.rank == v.rank # tNpArray
def __hash__(self): return hash(f"{self.child} - {self.rank}") # tNpArray
[docs] def expand(self, n): return [self.item()]*n # tNpArray
tArrayTypes = (tNpArray, tTensor) # tNpArray
else: # tNpArray
[docs] class tTensor(tBase): pass # tNpArray
tArrayTypes = (tNpArray,) # tNpArray
[docs]class tCollection(tBase): # tCollection
[docs] def __init__(self, *children): # tCollection
"""Fixed-length collection of things. Let's say you want a tuple with
5 values::
a = [3, [2, 3], "e", 2.0, b'3']
Then, this would be represented like this::
tCollection(int, tList(int), str, float, bytes)
This also works in conjunction with :class:`tExpand`, like this::
a = [3, [2, 3], "e", 2.0, 3.0]
tCollection(int, tList(int), str, tExpand(float))""" # tCollection
super().__init__(None); self.children = list(children) # tCollection
nExpands = sum(isinstance(e, tExpand) for e in children) # tCollection
if nExpands > 1: raise TypeHintException("Can't have 2 `tExpand` in a `tCollection`") # tCollection
self.nChildren = len(children) - nExpands # minimum number of children possible # tCollection
self.expandIdx = -1 # tCollection
for i, e in enumerate(children): # tCollection
if isinstance(e, tExpand): self.expandIdx = i # tCollection
def __repr__(self): # tCollection
a = ' '.join(klassRepr(c) for c in self.children) # tCollection
return f"<{klassName(self)} {a}>" # tCollection
def __eq__(self, v): # tCollection
if isinstance(v, tCollection): # tCollection
if len(self.children) != len(v.children): return False # tCollection
for x, y in zip(self.children, v.children): # tCollection
if x != y: return False # tCollection
return True # tCollection
return False # tCollection
[docs] def check(self, v): # tCollection
t = type(v) if isinstance(v, (list, tuple)) else None # tCollection
v = list(v); l = [] # tCollection
if self.expandIdx >= 0: # tCollection
n = len(self.children); nv = len(v) # tCollection
nMatchExpand = nv-(n-1) # tCollection
for i in range(self.expandIdx): # tCollection
x = checkF(self.children[i])(v[i]); l.append(x) # tCollection
if x is yieldT: return yieldT # tCollection
for i in range(self.expandIdx, self.expandIdx + nMatchExpand): # tCollection
x = checkF(self.children[self.expandIdx])(v[i]); l.append(x) # tCollection
if x is yieldT: return yieldT # tCollection
for i in range(self.expandIdx + nMatchExpand, nv): # tCollection
x = checkF(self.children[i-nMatchExpand+1])(v[i]); l.append(x) # tCollection
if x is yieldT: return yieldT # tCollection
else: # tCollection
l = [] # tCollection
for c, e in zip(self.children, v): # tCollection
x = checkF(c)(e); l.append(x) # tCollection
if x is yieldT: return yieldT # tCollection
return t(l) if t else l # tCollection
[docs] def reduce(self): # tCollection
"""Tries to reduce ``tCollection(int, int)`` to ``tIter(int)`` if possible""" # tCollection
s = self.children[0] # tCollection
for e in self.children: # tCollection
if s != e: return self # tCollection
return tIter(s) # tCollection
[docs] def item(self): return tLowest(*((t.child if isinstance(e, tExpand) else t) for t in self.children)) # tCollection
[docs] def expand(self, n:int) -> List[tBase]: # tCollection
"""Expands out this collection so that it has a specified length""" # tCollection
if self.expandIdx >= 0: # tCollection
ts = [] # tCollection
for t in self.children: # tCollection
if isinstance(t, tExpand): # tCollection
for i in range(n - len(self.children) + 1): ts.append(t.child) # tCollection
else: ts.append(t) # tCollection
return ts # tCollection
else: # tCollection
if len(self.children) == n: return list(self.children) # tCollection
else: # doesn't make sense, so default case should return to list of lowest child # tCollection
return [self.item()]*n # tCollection
[docs]class tExpand(tBase): # tExpand
[docs] def __init__(self, child): # tExpand
"""Supplement to :class:`tCollection`""" # tExpand
super().__init__(child) # tExpand
[docs] def check(self, v): return checkF(self.child)(v) # tExpand
settings.atomic.add("typeHint", (numbers.Number, np.number, str, bool, bytes), "atomic types used for infering type of object for optimization passes") # tExpand
[docs]def inferType(o): # inferType
"""Tries to infer the type of the input.
Example::
# returns tList(int)
inferType(range(10))
# returns tTensor(torch.float32, 2)
inferType(torch.randn(2, 3))
""" # inferType
if isinstance(o, range): return tList(int) # inferType
if isinstance(o, settings.atomic.typeHint): return type(o) # inferType
if isinstance(o, np.ndarray): return tNpArray(o.dtype, len(o.shape)) # inferType
if hasTorch and isinstance(o, torch.Tensor): return tTensor(o.dtype, len(o.shape)) # inferType
if isinstance(o, (list, tuple)): # inferType
arr = []; diff = False; a = None # inferType
for e in o: # inferType
t = inferType(e); arr.append(t) # inferType
if a is None: a = t # inferType
if a != t: diff = True # inferType
if diff: # inferType
if len(arr) < 100: return tCollection(*arr) # inferType
else: return tList(tLowest(*arr)) # inferType
else: return tList(a) # inferType
if isinstance(o, dict): return tDict(inferType(list(o.keys())), inferType(list(o.values()))) # inferType
return tAny() # inferType
def lowestChild(t): # lowestChild
if isinstance(t, tCollection): return tLowest(*t.children) # lowestChild
if isinstance(t, tListIterSet): return t.child # lowestChild
if isinstance(t, tArrayTypes): # lowestChild
if t.rank is None or t.rank == 1: return t.child # lowestChild
if t.rank is None: return t.__class__(t.child) # lowestChild
else: return t.__class__(t.child, t.rank - 1) # lowestChild
raise TypeHintException(f"Type {t} does not have a lowest child") # lowestChild
intTypes = {int, np.int8, np.int16, np.int32, np.int64, torch.int8, torch.int16, torch.int32, torch.int64} # lowestChild
floatTypes = {float, np.float16, np.float32, np.float64, torch.float16, torch.float32, torch.float64, torch.bfloat16} # lowestChild
try: floatTypes.add(np.float128) # some systems don't have float128 # lowestChild
except: pass # lowestChild
intFloatTypes = {*intTypes, *floatTypes} # lowestChild
numericTypes = {*intTypes, *floatTypes, complex, numbers.Number} # lowestChild
def allSame(l): return all(t == l[0] for t in l) # allSame
[docs]def tLowest(*ts): # tLowest
"""Grabs the lowest possible shared type of all the example types.
Example::
# returns tIter(float)
tLowest(tIter(float), tList(int))""" # tLowest
# sort of like array types? # tLowest
if all(isinstance(t, tArrayTypes) for t in ts): # tLowest
lC = tLowest(*(lowestChild(t) for t in ts)) # tLowest
if all(isinstance(t, tTensor) for t in ts) or all(isinstance(t, tNpArray) for t in ts): # tLowest
t = ts[0]; rank = t.rank if allSame([t.rank for t in ts]) else None # tLowest
child = t.child if allSame([t.child for t in ts]) else None # tLowest
return t.__class__(child, rank) # tLowest
# sort of like list? # tLowest
if all(isinstance(t, (tList, tIter, tSet, *tArrayTypes, tCollection)) for t in ts): # tLowest
lC = tLowest(*(lowestChild(t) for t in ts)) # tLowest
if any(isinstance(t, (tIter, tCollection)) for t in ts): return tIter(lC) # tLowest
return tList(lC) # tLowest
# all numeric? # tLowest
if all(t in numericTypes for t in ts): # tLowest
if all(t in intTypes for t in ts): return int # tLowest
if all(t in intFloatTypes for t in ts): return float # tLowest
return numbers.Number # tLowest
return tAny() # tLowest
def _tCheck(inp, op): # _tCheck
a = inferType(inp); out = inp | op; b = inferType(out) # _tCheck
x = checkF(a)(inp); c1 = x is yieldT # _tCheck
y = checkF(b)(out); c2 = y is yieldT # _tCheck
z = checkF(op._typehint(a))(y); c3 = z is yieldT # _tCheck
if c1 or c2 or c3: # _tCheck
global tCheckData # _tCheck
tCheckData = [a, b, c1, c2, c3, inp, out] # _tCheck
raise TypeHintException(f"Type hints are wrong. Hints: inp type ({a}), out type ({b}). Checks: {c1}, {c2}, {c3}. Inp: {inp}, out: {out}") # _tCheck
return z # _tCheck
[docs]class tCheck(cli.BaseCli): # tCheck
[docs] def __init__(self): # tCheck
"""Tool similar to :class:`~k1lib.cli.trace.trace` to check whether
all type hint outputs of all clis are good or not. Example::
assert range(1, 3) | tCheck() | item() | op()*2 == 2
Mainly used in cli unit tests. Return type of statement will be :class:`tCheck`,
which might be undesirable, so you can pipe it to :attr:`~k1lib.cli.init.yieldT` like this::
# returns tCheck object
range(1, 3) | tCheck() | item() | op()*2
# returns number "2"
range(1, 3) | tCheck() | item() | op()*2 | yieldT""" # tCheck
self.inp = None # tCheck
[docs] def __ror__(self, v): self.inp = v; return self # tCheck
def __or__(self, op): # tCheck
if op is yieldT: return self.inp # tCheck
self.inp = _tCheck(self.inp, op); return self # tCheck
def __eq__(self, v): return self.inp == v # tCheck
[docs]class tOpt(cli.BaseCli): # tOpt
_passes = []; _serialPasses = [] # tOpt
_passStruct = {}; _serialStruct = {} # tOpt
n = 10 # tOpt
[docs] def __init__(self): # tOpt
"""Optimizes clis. Let's say you have something
like this::
range(1000) | toList() | head() | deref()
For whatever reason you forgot that you've dereferenced everything
in the middle, although you're only using 10 first elements, so the
code can't be lazy anymore. You can apply optimizations to it like this::
range(1000) | tOpt() | toList() | head() | deref()
This will effectively turn it into this::
range(1000) | tOpt() | head() | deref()
Normally, you'd use it in this form instead::
# returns the optimized cli
f = "file.txt" | tOpt() | cat() | shape(0) | tOpt
# then you can do this to pass it through as usual
"other file.txt" | f
Checkout the `llvm optimizer tutorial <llvm.html>` for a more in-depth explanation of this
More over, this combines nicely with :class:`~k1lib.cli.trace.trace` like this::
range(5) | tOpt() | trace() | apply(op()**2) | deref()""" # tOpt
self.inp = None; self.clis = [] # tOpt
self._out = yieldT # tOpt
@staticmethod # tOpt
def _addBasePass(p, abstractness=1): # tOpt
"""Adds an optimization pass that acts upon a single cli.
Example::
def o1(c:BaseCli, t:tBase):
if ...:
return aS(lambda x: x**2)
else:
return None
tOpt._addBasePass(o1, 6)
""" # tOpt
tOpt._passes.append([p, round(max(min(abstractness, 2), 1))]) # tOpt
passStruct = {} # tOpt
for a1 in range(2, 0, -1): # tOpt
passStruct[a1] = [] # tOpt
for p, a2 in tOpt._passes: # tOpt
if a2 == a1: passStruct[a1].append(p) # tOpt
tOpt._passStruct = passStruct # tOpt
[docs] @staticmethod # tOpt
def addPass(p, klasses:List[cli.BaseCli]=[], abstractness=3): # tOpt
"""Adds an optimization pass that acts upon multiple clis in series.
Example::
# cs: list of clis, ts: list of input type hints, 1 for each cli
def o1(cs:List[BaseCli], ts:List[tBase], metadata={}):
return [cs[1], cs[0]] # reorder the clis
tOpt.addPass(o1, [toList, head], 3)
Here, we're declaring an optimization pass ``o1``. You will be given a list of cli
objects, the cli's input type hints and some extra metadata. If you can optimize
it, then you should return a list of new clis, else you should return None
Also, ``abstractness`` has varying number of legal values:
- 1-5: generic optimizations
- 6-10: analysis passes. Passes must not return anything
Higher abstraction optimizations will be called first, and then lower abstraction
optimizations will be called later. So, the idea is, just like LLVM, you can do
some analysis which will compute metadata that you can use in your optimization
passes, which will return optimized clis if it can.
Within optimization passes, you can prioritize optimizations that look at the global
picture first, before breaking the code up into tiny fragments with more detailed
optimizations, at which point it's hard to look at the global picture.
:param p: the optimization pass
:param klasses: list of cli classes in series that will trigger the pass
:param abstractness: how abstract is this optimization""" # tOpt
tOpt._serialPasses.append([p, tuple(klasses), round(max(min(abstractness, 15), 1))]) # tOpt
serialStruct = {} # tOpt
for a1 in range(15, 0, -1): # tOpt
serialStruct[a1] = defaultdict(lambda: []) # tOpt
for p, klasses, a2 in tOpt._serialPasses: # tOpt
if a2 == a1: serialStruct[a1][klasses].append(p) # tOpt
tOpt._serialStruct = serialStruct # tOpt
[docs] @staticmethod # tOpt
def clearPasses(): # tOpt
"""Clears all passes""" # tOpt
tOpt._passes = []; tOpt._serialPasses = [] # tOpt
tOpt._passStruct = {}; tOpt._serialStruct = {} # tOpt
addSerialOpt() # tOpt
@property # tOpt
def out(self): # tOpt
if self._out == yieldT: # tOpt
if isinstance(self.inp, cli.BaseCli): # tOpt
self.clis = [self.inp, *self.clis]; self.inp = None # tOpt
# why wrap 2 times? We want passes to select klasses=[serial] # tOpt
c = cli.serial(cli.serial(*self.clis)); t = inferType(self.inp) # tOpt
# start optimization passes here # tOpt
for i in range(tOpt.n): # tOpt
atLeastOnce = False #print("-"*50) # tOpt
for passes in tOpt._passStruct.values(): # tOpt
for p in passes: # tOpt
repl = p(c, t) # tOpt
if repl is not None: atLeastOnce = True; c = repl # optimized version # tOpt
if not atLeastOnce: break # tOpt
assert isinstance(c, cli.serial) and len(c.clis) == 1 # tOpt
self._optCli = c.clis[0]; self._out = self.inp | c # tOpt
return self._out # tOpt
@property # tOpt
def optCli(self): # tOpt
"""Grabs the optimized cli.
Example::
# returns optimized cli
(range(5) | tOpt() | apply(op()**2) | deref()).optCli
# you can also do it like this:
range(5) | tOpt() | apply(op()**2) | deref() | tOpt.optCli
# or even shorter like this:
range(5) | tOpt() | apply(op()**2) | deref() | tOpt
""" # tOpt
self.out; return self._optCli # tOpt
[docs] def __ror__(self, it): self.inp = it; return self # tOpt
def __iter__(self): return iter(self.out) # tOpt
def __or__(self, o): # tOpt
if o is yieldT: return self.out # tOpt
if o is tOpt.optCli or o is tOpt: # tOpt
return self.optCli # tOpt
self.clis.append(o); return self # tOpt
def __repr__(self): return f"{self.out}" # tOpt
def __eq__(self, v): return self.out == v # tOpt
def __bool__(self): return self.out # tOpt
class window(cli.BaseCli): # window
def __init__(self, n, newList=False): # window
self.n = n # window
def __ror__(self, it): # window
n = self.n; before = []; q = deque([], n) # window
it = iter(it) # window
for e in it: # window
q.append(e) # window
if len(q) == n: # window
yield before, tuple(q), it; before.append(q.popleft()) # window
def grabTypes(cs, t): # grabTypes
ts = [t] # grabTypes
for c in cs: t = c._typehint(t); ts.append(t) # grabTypes
return ts # grabTypes
def grabKlasses(iKlasses): return [type(e) for e in iKlasses] # grabKlasses
depth = 0; debug = False # grabKlasses
def serialOpt(c, t, metadata=None): # serialOpt
"""Optimizes ``c``, which is supposed to be a :class:`~init.serial`
object, with the input type hint ``t``. If it can actually optimize it,
then it will return a new :class:`~init.serial` object, else it returns
None.""" # serialOpt
if debug: global depth; depth += 1; print(f"serial depth: {depth}") # serialOpt
if metadata is None: metadata = {"route": []} # serialOpt
# returns None, or a new serial object # serialOpt
if not isinstance(c, cli.serial): # serialOpt
if debug: print(f"out depth, not serial: {depth}"); depth -= 1 # serialOpt
return None # serialOpt
metadata["route"].append("serial") # serialOpt
cs = c.clis; ts = grabTypes(cs, t) # serialOpt
if debug: print(f"serialOpt: {[c.__class__.__name__ for c in cs]}, {ts}") # serialOpt
for windowSize in range(1, len(cs)+1): # serialOpt
for a, e, c in [cs, ts] | cli.transpose() | window(windowSize): # serialOpt
iKlasses, ths = e | cli.transpose() # serialOpt
klasses = tuple(type(e) for e in iKlasses) # serialOpt
#print(klasses) # serialOpt
for e in tOpt._serialStruct.values(): # serialOpt
if klasses in e: # serialOpt
for p in e[klasses]: # serialOpt
res = p(iKlasses, ths, metadata) # serialOpt
#print(f"serial p: {p}, res: {0}, klasses: {klasses}") # serialOpt
if res is not None: # serialOpt
a = a | cli.toList(); c = c | cli.toList() # serialOpt
if debug: print(f"out depth new: {depth}"); depth -= 1 # serialOpt
metadata["route"].pop() # serialOpt
return cli.serial(*(a | cli.transpose() | cli.item() if len(a) > 0 else []), # serialOpt
*res, # serialOpt
*(c | cli.transpose() | cli.item() if len(c) > 0 else [])) # serialOpt
if debug: print(f"out depth none: {depth}"); depth -= 1 # serialOpt
metadata["route"].pop() # serialOpt
def addSerialOpt(): # addSerialOpt
tOpt._addBasePass(serialOpt, 5) # addSerialOpt
def inner(cs, ts, metadata): # addSerialOpt
res = serialOpt(cs[0], ts[0], metadata) # addSerialOpt
return None if res is None else [res] # addSerialOpt
tOpt.addPass(inner, [cli.serial], 15) # addSerialOpt
try: cli.optimizations.basics() # cyclic include, so mainly intended for regular use after first initialization # addSerialOpt
except: pass # addSerialOpt
tOpt.clearPasses(); # addSerialOpt