# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for advanced machine learning models or complex data structures and algorithms, presented in a digestible interface
"""
from typing import Callable, Union, List, overload, Iterator, Any, Set, Tuple
from k1lib.cli.init import BaseCli, fastF
import k1lib.cli as cli; import k1lib, os, math
from k1lib.cli.typehint import *
import numpy as np; from collections import deque
try: import torch; hasTorch = True
except: hasTorch = False
__all__ = ["embed", "complete", "kmeans", "tsne", "bloom"]
settings = k1lib.Settings().add("cuda", None, "whether to run the models on the GPU or not. True for GPU, False for CPU. None (default) for GPU if available, else CPU")
k1lib.settings.cli.add("models", settings, "settings related to k1lib.cli.models");
_cuda = k1lib.Wrapper(None)
def cuda() -> bool: # internal func to figure out whether the funcs should run on gpus or not # cuda
if _cuda() is None: _cuda.value = torch.cuda.is_available() and torch.cuda.device_count() >= 1 # cuda
return _cuda() # cuda
sentence_transformers = k1lib.dep("sentence_transformers", url="https://www.sbert.net/"); embed_models_cpu = dict(); embed_models_cuda = dict() # cuda
def embed_models(): return embed_models_cuda if cuda() else embed_models_cpu # embed_models
settings.add("embed", k1lib.Settings().add("model", "all-MiniLM-L6-v2", "what model to choose from `SentenceTransformer` library").add("bs", 512, "batch size to feed the model. For all-MiniLM-L6-v2, it seems to be able to deal with anything. I've tried 10k batch and it's still doing good")) # embed_models
def embed_model(): # returns correct function capable of passing in List[str] and will spit out np.ndarray with shape (N, F) # embed_model
modelName = settings.embed.model # embed_model
if modelName not in embed_models(): # embed_model
model = sentence_transformers.SentenceTransformer(modelName) # embed_model
if cuda(): model = model.cuda() # embed_model
embed_models()[modelName] = model.encode # embed_model
return embed_models()[modelName] # embed_model
[docs]class embed(BaseCli): # embed
[docs] def __init__(self, norm=True): # embed
"""Gets an embedding vector for every sentence piped into this.
Example::
# returns (384,)
"abc" | embed() | shape()
# returns (2, 384). Don't worry that this is less performant, as behind the scenes, it will automatically batch all lines together and pass through the model only once
["abc", "def"] | embed().all() | shape()
There are several settings you can set::
settings.cli.models.embed.model = "msmarco-distilbert-base-v4" # specifies model used in this function
settings.cli.models.cuda = False # tells the system to only use the CPU to run the models
I'd suggest picking "all-MiniLM-L6-v2" for general purpose tasks, and "msmarco-distilbert-base-v4"
for document-lookup style applications.
:param norm: whether to normalize the output embeddings or not""" # embed
self.model = embed_model(); self.normF = (lambda x: (x - (x | cli.toMean())) / (x | cli.toStd())) if norm else (lambda x: x) # embed
[docs] def __ror__(self, it): return self.normF(self.model([it])[0] if isinstance(it, str) else self.model(list(it))) # embed
def _all_opt(self, it:List[str]): return it | cli.batched(settings.embed.bs, True) | cli.apply(self.__ror__) | cli.joinStreams() # embed
transformers = k1lib.dep("transformers", url="https://huggingface.co/docs/transformers/en/index"); generic_models_cpu = dict(); generic_models_cuda = dict() # embed
def generic_models(): return generic_models_cuda if cuda() else generic_models_cpu # generic_models
settings.add("generic", k1lib.Settings().add("model", "google/flan-t5-xl", "what model to choose from `transformers` library").add("bs", 16, "batch size to feed the model. For flan-t5-xl, 16 seems to be the sweet spot for 24GB VRAM (RTX 3090/4090). Decrease it if you don't have as much VRAM")) # generic_models
def generic_model(maxTokens=100): # returns correct function capable of passing in str|List[str] and will spit out List[str] # generic_model
modelName = settings.generic.model # generic_model
if modelName not in generic_models(): # generic_model
if modelName.startswith("google/flan-t5-"): # generic_model
tokenizer = transformers.T5Tokenizer.from_pretrained(modelName, **({"device_map": "auto"} if cuda() else {})) # generic_model
model = transformers.T5ForConditionalGeneration.from_pretrained(modelName, **({"device_map": "auto"} if cuda() else {})) # generic_model
conf = transformers.GenerationConfig(max_new_tokens=maxTokens); cuda_ = cuda() # generic_model
# if cuda_: model = model.cuda() # generic_model
def inner(it): # generic_model
inputs = tokenizer(list(it), return_tensors="pt", padding=True).input_ids # generic_model
if cuda_: inputs = inputs.cuda() # generic_model
return (tokenizer.decode(line) for line in model.generate(inputs, conf)) # generic_model
generic_models()[modelName] = inner # generic_model
else: raise Exception("Currently, only Google T5 Flan models are supported") # generic_model
return generic_models()[modelName] # generic_model
[docs]class complete(BaseCli): # complete
[docs] def __init__(self, prompt:str=""): # complete
"""Uses a LLM to autocomplete something.
Example::
# returns "4". In case you're living in a cage, these LLMs are not entirely math savants. They sure understand English though
"What is 2 + 6?" | complete()
# returns ["4", "4"]
["What is 2 + 6?", "What is 8 + 2?"] | complete().all() | deref()
Can change model type by doing ``settings.cli.models.generic.model = "google/flan-t5-xl"``
:param prompt: if specified, will inject this bit of text after all of the inputs.
Can be something like "Please translate the above paragraph to German"
""" # complete
self.model = generic_model(); self.prompt = prompt # complete
[docs] def __ror__(self, it): # complete
arrMode = not isinstance(it, str); prompt = self.prompt # complete
it = (list(it) if arrMode else [it]) | cli.apply(lambda x: f"{x}\n\n\n{prompt}: ") | cli.deref() # complete
ans = self.model(it) | cli.apply(lambda x: x.replace("<pad>", "").replace("<unk>", "").replace("</s>", "").strip()) # complete
return ans if arrMode else ans | cli.item() # complete
def _all_opt(self, it:List[str]): # complete
return it | cli.batched(settings.generic.bs, True) | cli.apply(self.__ror__) | cli.joinStreams() # complete
skclus = k1lib.dep("sklearn.cluster", url="https://scikit-learn.org/") # complete
skpre = k1lib.dep("sklearn.preprocessing", url="https://scikit-learn.org/") # complete
skmet = k1lib.dep("sklearn.metrics", url="https://scikit-learn.org/") # complete
def refine(fea, a, b, kwargs, timeout=1): # refine
scores = []; topScore = float("-inf") # refine
for k in torch.loglinspace(a, b, 10).numpy().astype(int) | cli.aS(np.unique): # refine
km = skclus.KMeans(n_clusters=k, **{**{"init": "k-means++", "n_init": 10, "max_iter": 30}, **kwargs}) # refine
try: [fea] | cli.applyTh(km.fit, timeout=timeout) | cli.ignore() # refine
except: break # refine
score = skmet.silhouette_score(fea, km.labels_) # refine
topScore = max(topScore, score); scores.append([k, score]) # refine
if score*2 < topScore: break # refine
return scores | ~cli.sort(1) | cli.cut(0) | cli.item(), k # refine
def findCenters(fea, c, kwargs, timeout=1): # findCenters
if c is None: # findCenters
a = 2; b = len(fea); c, b = refine(fea, a, b, kwargs, timeout); a = round(a*0.2 + c*0.8); b = round(b*0.5 + c*0.5) # findCenters
while b - a > 3: c, b = refine(fea, a, b, kwargs, timeout); a = round(a*0.2 + c*0.8); b = round(b*0.5 + c*0.5) # findCenters
km = skclus.KMeans(n_clusters=c, init="k-means++", n_init=10, max_iter=100) # findCenters
km.fit(fea); return km.cluster_centers_, km.labels_ # findCenters
[docs]class kmeans(BaseCli): # kmeans
[docs] def __init__(self, k=None, mode=1, timeout=1, **kwargs): # kmeans
"""Do k-means clustering, returning the cluster centers.
Example::
features, true_labels = sklearn.datasets.make_blobs(n_samples=1_000, centers=5, cluster_std=0.5)
centers = features | kmeans();
centers | shape() # likely return (5, 2)
# plotting things out
plt.plot(*features.T, ".")
plt.plot(*centers.T, ".")
.. image:: ../images/kmeans.png
:param k: if specified, will use this k value. Else tries to guess what the best value is
:param mode: mode 0 (returns [cluster centers, labels]), mode 1 (returns cluster centers only), mode 2 (returns labels only)
:param timeout: internally will try kmeans for up to this number of seconds only.
Will kill the job if it's taking longer
:param kwargs: keyword arguments will be passed into sklearn.cluster.KMeans directly.
Some interesting parameters include ``init``, ``n_init``, ``max_iter``
""" # kmeans
self.k = k; self.mode = mode; self.timeout = timeout; self.kwargs = kwargs # kmeans
[docs] def __ror__(self, it): # kmeans
scaler = skpre.StandardScaler(); fea = scaler.fit_transform(it); mode = self.mode # kmeans
centers, labels = findCenters(fea, self.k, self.kwargs, self.timeout) # kmeans
centers = scaler.inverse_transform(centers) # kmeans
if mode == 0: return [centers, labels] # kmeans
return centers if mode == 1 else labels # kmeans
skmani = k1lib.dep("sklearn.manifold", url="https://scikit-learn.org/") # kmeans
[docs]class tsne(BaseCli): # tsne
[docs] def __init__(self, n=2, **kwargs): # tsne
"""Transforms feature vectors of shape (N, F) down to (N, 2) for easy plotting.
Example::
from sklearn.datasets import make_blobs
features, true_labels = make_blobs(n_samples=1_000, n_features=5, centers=5, cluster_std=0.2)
features | shape() # returns (1000, 5)
features | tsne() | shape() # returns (1000, 2)
# plotting things out that has nice colors and whatnot
features | tsne() & kmeans(5, 2) | ~aS(lambda xy,c: plt.scatter(*xy.T,c=c))
.. image:: ../images/tsne.png
:param n: number of output components (aka size of feature vector)
:param kwargs: other keyword arguments passed into ``sklearn.manifold.TSNE``""" # tsne
self.n = n; self.kwargs = kwargs # tsne
[docs] def __ror__(self, it): # tsne
if not isinstance(it, k1lib.settings.cli.arrayTypes): it = np.array(list(it)) # tsne
return skmani.TSNE(self.n, **self.kwargs).fit_transform(it) # tsne
settings.add("bloom", k1lib.Settings().add("scalable", k1lib.Settings().add("capacity", 1000, "initial filter's capacity").add("growth", 4, "how fast does the filter's capacity grow over time when the capacity is reached"), "settings for when you don't declare the bloom's capacity ahead of time"), "bloom filter settings") # tsne
pybloom_live = k1lib.dep("pybloom_live", "pybloom-live", "https://github.com/joseph-fox/python-bloomfilter, https://pypi.org/project/pybloom-live/") # tsne
[docs]class bloom(BaseCli): # bloom
[docs] def __init__(self, n:int=None, p:float=0.1, overflow:bool=False): # bloom
"""Creates a bloom filter.
Example::
bf = ["raptor", "skylake", "merlin", "twinscan nxt", "sapphire rapids"] | bloom()
"raptor" in bf # returns True
"twinscan nxt" in bf # returns True
"twin" in bf # most likely returns False, small chance returns True
This also allows distributed computing quite easily::
bf = range(10) | applyMp(lambda i: cat(f"file-{i}.txt") | bloom()) | bloom.join()
This code assumes that you have 10 files filled with text with file names "file-1.txt" and
you want to check whether a string exists or not in all of those files. It is expected that
.. admonition:: Scalable bloom filter
It's possible to leave the filter's capacity empty, which will create an initial filter
with capacity 1000. When that capacity is reached, it will expand the filter to a capacity
of 4000. Then 16000, and so on. This can be tweaked in the settings::
settings.cli.models.bloom.scalable.capacity = 2000 # sets filter's default initial capacity
settings.cli.models.bloom.scalable.growth = 2 # sets filter's growth factor when it runs out of space
Because fundamentally, bloom filters can't grow, internally, this will create multiple bloom
filters with increasing capacity, and whenever you search for a term, it will have to search
through multiple filters to get the answer. So even though you can leave the capacity empty,
it will degrade performance a little bit, which might be undesirable.
:param n: number of elements to be stored, aka filter's capacity
:param p: false positive probability (put 0.1 for 10% false positive, 0.01 for 1%)
:param overflow: if True, allows append more elements than the capacity of the filter, else (default) don't allow it""" # bloom
self.n = n; self.p = p; self.overflow = overflow # bloom
[docs] def __ror__(self, it): # bloom
n = self.n; p = self.p # bloom
bf = pybloom_live.ScalableBloomFilter(settings.bloom.scalable.capacity, p, settings.bloom.scalable.growth) if n is None else pybloom_live.BloomFilter(n, p) # bloom
if self.overflow: # bloom
for e in it: # bloom
try: bf.add(e) # bloom
except IndexError: bf.count = 0; bf.add(e) # bloom
else: # bloom
for e in it: bf.add(e) # bloom
return bf # bloom
[docs] @staticmethod # bloom
def join(): # bloom
def inner(bfs): # bloom
bfs = list(bfs) # bloom
if len(set([type(b) for b in bfs])) > 1: raise Exception("Can't join normal filters and scalable filters together. Please specify a common capacity to all of the bloom filters") # bloom
if type(bfs[0]) == pybloom_live.BloomFilter: # my implementation, which should be a little faster than builtin .union() # bloom
b = bfs[0].copy() # bloom
for b_ in bfs[1:]: b.bitarray |= b_.bitarray # bloom
return b # bloom
else: # has to use complex .union() in this case # bloom
b = bfs[0] # bloom
for b_ in bfs[1:]: b = b.union(b_) # bloom
return b # bloom
return cli.aS(inner) # bloom
try: # bloom
import pybloom_live # bloom
@k1lib.patch(pybloom_live.pybloom.BloomFilter) # bloom
def __or__(self, other): # bloom
if isinstance(other, BaseCli): return other.__ror__(self) # bloom
return self.union(other) # bloom
except: pass # bloom