Source code for k1lib.cli.models

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for advanced machine learning models or complex data structures and algorithms, presented in a digestible interface
"""
from typing import Callable, Union, List, overload, Iterator, Any, Set, Tuple
from k1lib.cli.init import BaseCli, fastF
import k1lib.cli as cli; import k1lib, os, math
from k1lib.cli.typehint import *
import numpy as np; from collections import deque
try: import torch; hasTorch = True
except: hasTorch = False
__all__ = ["embed", "complete", "kmeans", "tsne", "bloom"]
settings = k1lib.Settings().add("cuda", None, "whether to run the models on the GPU or not. True for GPU, False for CPU. None (default) for GPU if available, else CPU")
k1lib.settings.cli.add("models", settings, "settings related to k1lib.cli.models");
_cuda = k1lib.Wrapper(None)
def cuda() -> bool: # internal func to figure out whether the funcs should run on gpus or not # cuda
    if _cuda() is None: _cuda.value = torch.cuda.is_available() and torch.cuda.device_count() >= 1 # cuda
    return _cuda()                                                               # cuda
sentence_transformers = k1lib.dep("sentence_transformers", url="https://www.sbert.net/"); embed_models_cpu = dict(); embed_models_cuda = dict() # cuda
def embed_models(): return embed_models_cuda if cuda() else embed_models_cpu     # embed_models
settings.add("embed", k1lib.Settings().add("model", "all-MiniLM-L6-v2", "what model to choose from `SentenceTransformer` library").add("bs", 512, "batch size to feed the model. For all-MiniLM-L6-v2, it seems to be able to deal with anything. I've tried 10k batch and it's still doing good")) # embed_models
def embed_model(): # returns correct function capable of passing in List[str] and will spit out np.ndarray with shape (N, F) # embed_model
    modelName = settings.embed.model                                             # embed_model
    if modelName not in embed_models():                                          # embed_model
        model = sentence_transformers.SentenceTransformer(modelName)             # embed_model
        if cuda(): model = model.cuda()                                          # embed_model
        embed_models()[modelName] = model.encode                                 # embed_model
    return embed_models()[modelName]                                             # embed_model
[docs]class embed(BaseCli): # embed
[docs] def __init__(self, norm=True): # embed """Gets an embedding vector for every sentence piped into this. Example:: # returns (384,) "abc" | embed() | shape() # returns (2, 384). Don't worry that this is less performant, as behind the scenes, it will automatically batch all lines together and pass through the model only once ["abc", "def"] | embed().all() | shape() There are several settings you can set:: settings.cli.models.embed.model = "msmarco-distilbert-base-v4" # specifies model used in this function settings.cli.models.cuda = False # tells the system to only use the CPU to run the models I'd suggest picking "all-MiniLM-L6-v2" for general purpose tasks, and "msmarco-distilbert-base-v4" for document-lookup style applications. :param norm: whether to normalize the output embeddings or not""" # embed self.model = embed_model(); self.normF = (lambda x: (x - (x | cli.toMean())) / (x | cli.toStd())) if norm else (lambda x: x) # embed
[docs] def __ror__(self, it): return self.normF(self.model([it])[0] if isinstance(it, str) else self.model(list(it))) # embed
def _all_opt(self, it:List[str]): return it | cli.batched(settings.embed.bs, True) | cli.apply(self.__ror__) | cli.joinStreams() # embed
transformers = k1lib.dep("transformers", url="https://huggingface.co/docs/transformers/en/index"); generic_models_cpu = dict(); generic_models_cuda = dict() # embed def generic_models(): return generic_models_cuda if cuda() else generic_models_cpu # generic_models settings.add("generic", k1lib.Settings().add("model", "google/flan-t5-xl", "what model to choose from `transformers` library").add("bs", 16, "batch size to feed the model. For flan-t5-xl, 16 seems to be the sweet spot for 24GB VRAM (RTX 3090/4090). Decrease it if you don't have as much VRAM")) # generic_models def generic_model(maxTokens=100): # returns correct function capable of passing in str|List[str] and will spit out List[str] # generic_model modelName = settings.generic.model # generic_model if modelName not in generic_models(): # generic_model if modelName.startswith("google/flan-t5-"): # generic_model tokenizer = transformers.T5Tokenizer.from_pretrained(modelName, **({"device_map": "auto"} if cuda() else {})) # generic_model model = transformers.T5ForConditionalGeneration.from_pretrained(modelName, **({"device_map": "auto"} if cuda() else {})) # generic_model conf = transformers.GenerationConfig(max_new_tokens=maxTokens); cuda_ = cuda() # generic_model # if cuda_: model = model.cuda() # generic_model def inner(it): # generic_model inputs = tokenizer(list(it), return_tensors="pt", padding=True).input_ids # generic_model if cuda_: inputs = inputs.cuda() # generic_model return (tokenizer.decode(line) for line in model.generate(inputs, conf)) # generic_model generic_models()[modelName] = inner # generic_model else: raise Exception("Currently, only Google T5 Flan models are supported") # generic_model return generic_models()[modelName] # generic_model
[docs]class complete(BaseCli): # complete
[docs] def __init__(self, prompt:str=""): # complete """Uses a LLM to autocomplete something. Example:: # returns "4". In case you're living in a cage, these LLMs are not entirely math savants. They sure understand English though "What is 2 + 6?" | complete() # returns ["4", "4"] ["What is 2 + 6?", "What is 8 + 2?"] | complete().all() | deref() Can change model type by doing ``settings.cli.models.generic.model = "google/flan-t5-xl"`` :param prompt: if specified, will inject this bit of text after all of the inputs. Can be something like "Please translate the above paragraph to German" """ # complete self.model = generic_model(); self.prompt = prompt # complete
[docs] def __ror__(self, it): # complete arrMode = not isinstance(it, str); prompt = self.prompt # complete it = (list(it) if arrMode else [it]) | cli.apply(lambda x: f"{x}\n\n\n{prompt}: ") | cli.deref() # complete ans = self.model(it) | cli.apply(lambda x: x.replace("<pad>", "").replace("<unk>", "").replace("</s>", "").strip()) # complete return ans if arrMode else ans | cli.item() # complete
def _all_opt(self, it:List[str]): # complete return it | cli.batched(settings.generic.bs, True) | cli.apply(self.__ror__) | cli.joinStreams() # complete
skclus = k1lib.dep("sklearn.cluster", url="https://scikit-learn.org/") # complete skpre = k1lib.dep("sklearn.preprocessing", url="https://scikit-learn.org/") # complete skmet = k1lib.dep("sklearn.metrics", url="https://scikit-learn.org/") # complete def refine(fea, a, b, kwargs, timeout=1): # refine scores = []; topScore = float("-inf") # refine for k in torch.loglinspace(a, b, 10).numpy().astype(int) | cli.aS(np.unique): # refine km = skclus.KMeans(n_clusters=k, **{**{"init": "k-means++", "n_init": 10, "max_iter": 30}, **kwargs}) # refine try: [fea] | cli.applyTh(km.fit, timeout=timeout) | cli.ignore() # refine except: break # refine score = skmet.silhouette_score(fea, km.labels_) # refine topScore = max(topScore, score); scores.append([k, score]) # refine if score*2 < topScore: break # refine return scores | ~cli.sort(1) | cli.cut(0) | cli.item(), k # refine def findCenters(fea, c, kwargs, timeout=1): # findCenters if c is None: # findCenters a = 2; b = len(fea); c, b = refine(fea, a, b, kwargs, timeout); a = round(a*0.2 + c*0.8); b = round(b*0.5 + c*0.5) # findCenters while b - a > 3: c, b = refine(fea, a, b, kwargs, timeout); a = round(a*0.2 + c*0.8); b = round(b*0.5 + c*0.5) # findCenters km = skclus.KMeans(n_clusters=c, init="k-means++", n_init=10, max_iter=100) # findCenters km.fit(fea); return km.cluster_centers_, km.labels_ # findCenters
[docs]class kmeans(BaseCli): # kmeans
[docs] def __init__(self, k=None, mode=1, timeout=1, **kwargs): # kmeans """Do k-means clustering, returning the cluster centers. Example:: features, true_labels = sklearn.datasets.make_blobs(n_samples=1_000, centers=5, cluster_std=0.5) centers = features | kmeans(); centers | shape() # likely return (5, 2) # plotting things out plt.plot(*features.T, ".") plt.plot(*centers.T, ".") .. image:: ../images/kmeans.png :param k: if specified, will use this k value. Else tries to guess what the best value is :param mode: mode 0 (returns [cluster centers, labels]), mode 1 (returns cluster centers only), mode 2 (returns labels only) :param timeout: internally will try kmeans for up to this number of seconds only. Will kill the job if it's taking longer :param kwargs: keyword arguments will be passed into sklearn.cluster.KMeans directly. Some interesting parameters include ``init``, ``n_init``, ``max_iter`` """ # kmeans self.k = k; self.mode = mode; self.timeout = timeout; self.kwargs = kwargs # kmeans
[docs] def __ror__(self, it): # kmeans scaler = skpre.StandardScaler(); fea = scaler.fit_transform(it); mode = self.mode # kmeans centers, labels = findCenters(fea, self.k, self.kwargs, self.timeout) # kmeans centers = scaler.inverse_transform(centers) # kmeans if mode == 0: return [centers, labels] # kmeans return centers if mode == 1 else labels # kmeans
skmani = k1lib.dep("sklearn.manifold", url="https://scikit-learn.org/") # kmeans
[docs]class tsne(BaseCli): # tsne
[docs] def __init__(self, n=2, **kwargs): # tsne """Transforms feature vectors of shape (N, F) down to (N, 2) for easy plotting. Example:: from sklearn.datasets import make_blobs features, true_labels = make_blobs(n_samples=1_000, n_features=5, centers=5, cluster_std=0.2) features | shape() # returns (1000, 5) features | tsne() | shape() # returns (1000, 2) # plotting things out that has nice colors and whatnot features | tsne() & kmeans(5, 2) | ~aS(lambda xy,c: plt.scatter(*xy.T,c=c)) .. image:: ../images/tsne.png :param n: number of output components (aka size of feature vector) :param kwargs: other keyword arguments passed into ``sklearn.manifold.TSNE``""" # tsne self.n = n; self.kwargs = kwargs # tsne
[docs] def __ror__(self, it): # tsne if not isinstance(it, k1lib.settings.cli.arrayTypes): it = np.array(list(it)) # tsne return skmani.TSNE(self.n, **self.kwargs).fit_transform(it) # tsne
settings.add("bloom", k1lib.Settings().add("scalable", k1lib.Settings().add("capacity", 1000, "initial filter's capacity").add("growth", 4, "how fast does the filter's capacity grow over time when the capacity is reached"), "settings for when you don't declare the bloom's capacity ahead of time"), "bloom filter settings") # tsne pybloom_live = k1lib.dep("pybloom_live", "pybloom-live", "https://github.com/joseph-fox/python-bloomfilter, https://pypi.org/project/pybloom-live/") # tsne
[docs]class bloom(BaseCli): # bloom
[docs] def __init__(self, n:int=None, p:float=0.1, overflow:bool=False): # bloom """Creates a bloom filter. Example:: bf = ["raptor", "skylake", "merlin", "twinscan nxt", "sapphire rapids"] | bloom() "raptor" in bf # returns True "twinscan nxt" in bf # returns True "twin" in bf # most likely returns False, small chance returns True This also allows distributed computing quite easily:: bf = range(10) | applyMp(lambda i: cat(f"file-{i}.txt") | bloom()) | bloom.join() This code assumes that you have 10 files filled with text with file names "file-1.txt" and you want to check whether a string exists or not in all of those files. It is expected that .. admonition:: Scalable bloom filter It's possible to leave the filter's capacity empty, which will create an initial filter with capacity 1000. When that capacity is reached, it will expand the filter to a capacity of 4000. Then 16000, and so on. This can be tweaked in the settings:: settings.cli.models.bloom.scalable.capacity = 2000 # sets filter's default initial capacity settings.cli.models.bloom.scalable.growth = 2 # sets filter's growth factor when it runs out of space Because fundamentally, bloom filters can't grow, internally, this will create multiple bloom filters with increasing capacity, and whenever you search for a term, it will have to search through multiple filters to get the answer. So even though you can leave the capacity empty, it will degrade performance a little bit, which might be undesirable. :param n: number of elements to be stored, aka filter's capacity :param p: false positive probability (put 0.1 for 10% false positive, 0.01 for 1%) :param overflow: if True, allows append more elements than the capacity of the filter, else (default) don't allow it""" # bloom self.n = n; self.p = p; self.overflow = overflow # bloom
[docs] def __ror__(self, it): # bloom n = self.n; p = self.p # bloom bf = pybloom_live.ScalableBloomFilter(settings.bloom.scalable.capacity, p, settings.bloom.scalable.growth) if n is None else pybloom_live.BloomFilter(n, p) # bloom if self.overflow: # bloom for e in it: # bloom try: bf.add(e) # bloom except IndexError: bf.count = 0; bf.add(e) # bloom else: # bloom for e in it: bf.add(e) # bloom return bf # bloom
[docs] @staticmethod # bloom def join(): # bloom def inner(bfs): # bloom bfs = list(bfs) # bloom if len(set([type(b) for b in bfs])) > 1: raise Exception("Can't join normal filters and scalable filters together. Please specify a common capacity to all of the bloom filters") # bloom if type(bfs[0]) == pybloom_live.BloomFilter: # my implementation, which should be a little faster than builtin .union() # bloom b = bfs[0].copy() # bloom for b_ in bfs[1:]: b.bitarray |= b_.bitarray # bloom return b # bloom else: # has to use complex .union() in this case # bloom b = bfs[0] # bloom for b_ in bfs[1:]: b = b.union(b_) # bloom return b # bloom return cli.aS(inner) # bloom
try: # bloom import pybloom_live # bloom @k1lib.patch(pybloom_live.pybloom.BloomFilter) # bloom def __or__(self, other): # bloom if isinstance(other, BaseCli): return other.__ror__(self) # bloom return self.union(other) # bloom except: pass # bloom