I have several machine learning tools running on my own cluster that's hosted
on https://mlexps.com/#kapi, and this module contains functions, classes and
clis that will contact that service. This is so that if I want to use a language
model in multiple notebooks, I'd have to load the model into my GPU for each
notebook, which would waste a lot of resources. I can't run a lot of notebooks
at the same time as I'd just run out of VRAM. So, by having dedicated services/demos,
I can really focus on serving things well and make it performant. For example::
"some text" | kapi.embed() # returns embedding numpy array
"What is Python? " | kapi.complete() # returns string, completes the sentence
"image.png" | toImg() | kapi.ocr() # returns `Ocr` object, with bounding boxes and text content of all possible texts
"cute anime girl" | kapi.txt2im() # generates an image from some description
"image.png" | toImg() | caption() # generates a caption of an image
__all__ = ["status", "segment", "demo", "embed", "embeds", "complete",
"ocr", "Ocr", "OcrBox", "tess",
"txt2im", "caption", "speech", "summarize", "post"]
from k1lib.cli.init import BaseCli; import k1lib.cli.init as init
import k1lib.cli as cli, k1lib, base64, html, json
requests = k1lib.dep.requests; k1 = k1lib
settings = k1lib.settings.cli
s = k1lib.Settings(); settings.add("kapi", s, "cli.kapi settings")
s.add("local", False, "whether to use local url instead of remote url. This only has relevance to me though, as the services are running on localhost")
def get(idx:str, json): # get
"""Sends a request to any service/demo on https://mlexps.com.
# returns "13.0"
kapi.get("demos/1-arith", {"a": 1, "b": 3, "c": True, "d": 2.5, "e": 10})
:param idx: index of the service, like "kapi/1-embed" """ # get
url = "http://localhost:9000" if s.local else "https://local.mlexps.com" # get
res = requests.post(f"{url}/routeServer/{idx.replace('/', '_')}", json=json) # get
if not res.ok: raise Exception(f"{res.status_code} - {res.reason}") # get
res = res.json() # get
if not res["success"]: raise Exception(res["reason"]) # get
return res["data"] # get
def jsF_get(idx, dataIdx): # jsF_get
url = "https://local.mlexps.com" # jsF_get
return f"""await (await fetch("{url}/routeServer/{idx.replace('/', '_')}", {{method: "POST", headers: {{ "Content-Type": "application/json" }}, body: JSON.stringify({dataIdx})}})).json()""" # jsF_get
[docs]def status(): # status
"""Displays a table of whether the services are online and available or not""" # status
["kapi/1-embed", "kapi/2-complete", "kapi/3-ocr", "kapi/4-txt2im", "kapi/5-caption", "kapi/6-speech"] | cli.apply(lambda x: [x, requests.get(f"https://local.mlexps.com/routeServer/{x.replace(*'/_')}/healthCheck").text == "ok"]) | cli.insert(["Service", "Online"]) | cli.display(None) # status
[docs]class segment(BaseCli): # segment
[docs] def __init__(self, limit:int=2000): # segment
"""Segments the input string by sentences, such that each segment's length is lower than the specified limit.
# returns ['some. Really', 'Long. String', 'Just. Monika']
"some. Really. Long. String. Just. Monika" | segment(15)
So, this will split the input string by ". ", then incrementally joins the strings together into segments.
This is useful in breaking up text so that it fits within language model's context size""" # segment
self.limit = limit # segment
[docs] def __ror__(self, text): # segment
if not isinstance(text, str): raise Exception("Input is not a string!") # segment
data = [[]]; c = 0; limit = self.limit # segment
for line in text.split(". "): # segment
if c + len(line) > limit and c > 0: # if even a single sentence is too big, then just have a segment as that sentence, and don't push it to the next one # segment
data.append([]); c = 0 # segment
data[-1].append(line); c += len(line)+2 # segment
return data | cli.join(". ").all() | cli.deref() # segment
metas = {} # Dict[prefix -> demo meta] # segment
[docs]class demo(BaseCli): # demo
[docs] def __init__(self, prefix:str="demos_1-arith"): # demo
"""Sends a request to one of mlexps.com demos.
# returns 21.0
{"a": 3} | kapi.demo("demos/1-arith")
# builds js interface that displays 21.0
{"a": 3} | (toJsFunc() | kapi.demo("demos/1-arith")) | op().interface("jsone")
# same as above, but the dictionary is formed in JS instead of Python
3 | (toJsFunc() | aS("{'a': x}") | demo("demos/1-arith")) | op().interface("jsone")
You don't have to specify all params, just the ones you want to deviate from the defaults""" # demo
prefix = prefix.replace(*"/_"); self.prefix = prefix # demo
res = requests.get(f"https://mlexps.com/{prefix.replace(*'_/')}/demo_meta.json") # demo
if not res.ok: raise Exception(f"Demo {prefix.replace(*'_/')} not found!") # demo
if prefix not in metas: metas[prefix] = json.loads(res.text) # demo
[docs] def __ror__(self, d): # demo
prefix = self.prefix; meta = metas[prefix]; kw = {} # demo
for arg in meta["args"]: # demo
a = meta["defaults"][arg]; anno = meta["annos"][arg] # demo
if anno in ("checkbox", "bytes", "image", "serialized"): a = a # demo
elif anno == "dropdown": a = a[1][a[0]] # demo
elif anno == "apiKey": a = k1lib.apiKey if hasattr(k1lib, "apiKey") else a[0] # demo
else: a = a[0] # demo
kw[arg] = k1lib.serve.webToPy(a, anno) # demo
for k, v in d.items(): kw[k] = v # demo
for k, v in kw.items(): kw[k] = k1lib.serve.pyToWeb(v, meta["annos"][k]) # demo
url = "http://localhost:9003" if k1lib.settings.cli.kapi.local else "https://local.mlexps.com" # demo
res = requests.post(f"{url}/routeServer/{prefix}", json=kw) # demo
if not res.ok: raise Exception(res.reason) # demo
res = res.json() # demo
if res["success"]: return k1lib.serve.webToPy(res["data"], meta["annos"]["return"]) # demo
else: raise Exception(res["reason"]) # demo
def __repr__(self): return f"<demo prefix='{self.prefix}'>" # demo
def _repr_html_(self): s = html.escape(f"{self}"); return f"{s}{metas[self.prefix]['mainDoc']}" # demo
def _jsF(self, meta): # demo
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); prefix = self.prefix; prefixSlash = prefix.replace(*"_/") # demo
try: apiKey = k1lib.apiKey # demo
except: apiKey = '' # demo
return f"""\
{fIdx} = async ({dataIdx}) => {{
const meta = await (await fetch("https://mlexps.com/{prefixSlash}/demo_meta.json")).json();
const kw = {{}};
for (const arg of meta.args) {{
let a = meta.defaults[arg]; anno = meta.annos[arg];
if (["checkbox", "bytes", "image", "serialized"].includes(anno)) a = a;
else if (anno === "dropdown") a = a[1][a[0]];
else if (anno === "apiKey") a = "{apiKey}";
else a = a[0];
kw[arg] = a
for (const [k,v] of Object.entries({dataIdx})) {{ kw[k] = v; }}
let res = await (await fetch("https://local.mlexps.com/routeServer/{prefix}", {{ method: "POST", headers: {{ "Content-Type": "application/json" }}, "body": JSON.stringify(kw) }})).text();
try {{ res = JSON.parse(res); }} catch (e) {{ throw new Error(`Can't decode json of '${{res}}'`); }}
if (!res.success) throw new Error(`Request failed: '${{res.reason}}'`);
let data = res.data;
if (meta.annos.return === "html") return atob(data);
if (meta.annos.return === "image") return `<img src="data:image;base64,${{data}}">`
return data;
}}""", fIdx # demo
[docs]class embed(BaseCli): # embed
[docs] def __init__(self): # embed
"""Gets an embedding vector for every sentence piped into this using `all-MiniLM-L6-v2`.
# returns (384,)
"abc" | kapi.embed() | shape()
# returns (2, 384)
["abc", "def"] | kapi.embed().all() | shape()
- VRAM: 440MB
- Throughput: 512/s
See also: :class:`~k1lib.cli.models.embed`""" # embed
pass # embed
[docs] def __ror__(self, it): return self._all_opt([it]) | cli.item() # embed
def _all_opt(self, it:"list[str]"): # embed
for b in it | cli.batched(1024, True): # embed
yield from get("kapi/1-embed", {"lines": k1lib.encode(b)}) | cli.aS(k1lib.decode) # embed
[docs]class embeds(BaseCli): # embeds
[docs] def __init__(self): # embeds
"""Breaks up some text and grab the embedding vectors of each segment.
"sone long text" | kapi.embeds() # returns list of (segment, numpy vector)
This is just a convenience cli. Internally, this splits the text up using :class:`segment`
and then embeds each segment using :class:`embed`
""" # embeds
pass # embeds
[docs] def __ror__(self, it): return self._all_opt([it]) | cli.item() # embeds
def _all_opt(self, it:"list[str]"): return it | cli.apply(segment(700) | cli.iden() & embed().all() | cli.transpose()) | cli.deref() # embeds
[docs]class complete(BaseCli): # complete
[docs] def __init__(self, prompt:str=None, maxTokens:int=200): # complete
"""Generates text from predefined prompts using `Llama 2`.
# returns string completion
"What is Python?" | kapi.complete()
# returns list of string completions
["What is Python?", "What is C++?"] | kapi.complete().all()
# returns list of string completions. The prompts sent to the server are ["<paragraph 1>\\n\\n\\nPlease summarize the above paragraph", ...]
["<paragraph 1>", "<paragraph 2>"] | kapi.complete("Please summarize the above paragraph").all()
- VRAM: 22GB
- Throughput: 8/s
:param max_tokens: maximum amount of tokens
See :class:`~k1lib.cli.models.complete`. That one is an older version using Google Flan T5 instead of llama 2""" # complete
self.prompt = prompt; self.maxTokens = maxTokens # complete
[docs] def __ror__(self, it): return self._all_opt([it]) | cli.item() # complete
def _all_opt(self, it:"list[str]"): # complete
if self.prompt: it = it | cli.apply(lambda x: f"{x}\n\n\n{self.prompt}: ") | cli.deref() # complete
if not (isinstance(it, (list, tuple)) and isinstance(it[0], str)): # complete
raise Exception("You might have forgot to use .all(), like ['str1', 'str2'] | kapi.complete().all()") # complete
it = it | cli.apply(lambda x: [x, self.maxTokens]) | cli.deref() # complete
return get("kapi/2-complete", {"prompts": json.dumps(it)}) | cli.aS(json.loads) # complete
def _jsF(self, meta): # complete
fIdx = cli.init._jsFAuto(); dataIdx = cli.init._jsDAuto() # complete
body = f"{{ prompts: JSON.stringify([{dataIdx}].map((x) => [`${{x}}\\n\\n\\n{self.prompt or ''}`, {cli.kjs.v(self.maxTokens)}])) }}" # complete
return f"""
{fIdx} = async ({dataIdx}) => {{
const res = {jsF_get('kapi/2-complete', body)}
return res[0]
}}""", fIdx # complete
tf = k1.dep("torchvision.transforms") # complete
[docs]class ocr(BaseCli): # ocr
[docs] def __init__(self, paragraph:bool=False, resize=True, bs:int=10): # ocr
"""Do OCR (optical character recognition) on some image.
o = "some_image.png" | toImg() | kapi.ocr() # loads image and do OCR on them
o # run this in a separate notebook cell for an overview of where the boxes are
o.res # see raw results received from the OCR service
That returns something like this::
[[[771, 5, 813, 17], 'round', 0.7996242908503107],
[[58, 10, 100, 34], '150', 0.883547306060791],
[[166, 8, 234, 34], '51,340', 0.9991665158446097],
[[782, 14, 814, 38], '83', 0.9999995785315409],
[[879, 13, 963, 33], 'UPGRADes', 0.7625563055298393],
[[881, 53, 963, 69], 'Monkey Ace', 0.9171751588707685],
[[933, 133, 971, 149], '5350', 0.9001984000205994],
[[873, 203, 911, 219], '5325', 0.481669545173645],
[[931, 203, 971, 219], '5500', 0.7656491994857788],
[[869, 271, 913, 291], 'G800', 0.31933730840682983],
[[925, 271, 977, 291], '64600', 0.14578145924474253],
[[871, 341, 911, 361], '5750', 0.5966295003890991],
[[929, 341, 971, 361], '5850', 0.9974847435951233]]
First column is the bounding box (x1, y1, x2, y2), second column is the text,
and third column is the confidence, from 0 to 1.
Internally, this uses EasyOCR for the recognition. However, from my experience,
this doesn't always get it right. It's particularly bad at symbols like dollar
signs (it thinks it's "S", or "5" instead), periods or commads. So, you can refine
each of the bounding boxes like this::
ocr = someImg | kapi.ocr()
ocr[4] | toImg() | kapi.tess() # returns string, uses tesseract OCR instead of EasyOCR for more accuracy for a less complex scene
See also: :class:`Ocr`
- Throughput: depends heavily on image resolution, but for 1000x750 images, should be 3-4 images/s
:param paragraph: whether to try to combine boxes together or not
:param resize: whether to resize the images to a reasonable size before sending it over or not. Runs faster if true
:param bs: how many images should this group together and send to the server at once?""" # ocr
self.paragraph = paragraph; self.resize = resize; self.bs = bs # ocr
[docs] def __ror__(self, it): return self._all_opt([it]) | cli.item() # ocr
def _all_opt(self, it:"list[PIL.Image.Image]"): # ocr
def resize(it): # resizing if they're too big # ocr
for img in it: # ocr
w, h = img | cli.shape() # ocr
if w > h: # ocr
if w > 1000: frac = 1000/w; img = img | tf.Resize([int(h*frac), int(w*frac)]) # ocr
else: # ocr
if h > 1000: frac = 1000/h; img = img | tf.Resize([int(h*frac), int(w*frac)]) # ocr
yield img, self.paragraph # ocr
return (resize(it) if self.resize else it | cli.apply(lambda img: [img, self.paragraph])) | cli.batched(self.bs, True)\
| cli.apply(lambda imgParas: [imgParas, get("kapi/3-ocr", {"data": k1.encode(imgParas | cli.apply(cli.toBytes(), 0) | cli.deref())}) | cli.aS(k1.decode)] | cli.transpose()) | cli.joinSt() | ~cli.apply(Ocr) # ocr
[docs]class Ocr: # Ocr
[docs] def __init__(self, imgPara, res): # Ocr
"""Ocr result object. Stores raw results from model in ``.res`` field and has many
more functionalities. Not intended to be instantiated by the end user. Example::
ocr = someImg | kapi.ocr() # ocr is an object of type Ocr
ocrBox = ocr[3] # grabs the 3rd detected bounding box
ocrBox.coords # grabs coordinates
ocrBox.text # grabs recognized text
ocrBox.confidence # grabs confidence
ocrBox | toImg() # grabs image cutout
ocrBox | toNdArray() # grabs numpy array cutout
ocrBox | toImg() | kapi.tess() # returns string, pass the image through tesseract OCR, to get more reliable results
See also: :class:`OcrBox`""" # Ocr
self.img, self.para = imgPara; self._npImg = None; self.res = res | cli.apply(~cli.aS(lambda x1,x2,y1,y2: [x1,y1,x2,y2]), 0) | cli.deref() # Ocr
[docs] def npImg(self): # Ocr
"""Grabs the numpy array of the image, shape (C, H, W)""" # Ocr
if self._npImg is None: self._npImg = self.img | cli.toNdArray() # Ocr
return self._npImg # Ocr
def __repr__(self): return f"<Ocr shape={self.img | cli.shape()}>" # Ocr
def _overlay(self) -> "PIL": # Ocr
img = self.img; res = self.res; p5 = k1.p5; w, h = img | cli.shape(); p5.newSketch(*img | cli.shape()); p5.background(255); p5.fill(255, 0) # Ocr
res | cli.cut(0) | ~cli.apply(lambda x1,y1,x2,y2: [x1,h-y2,x2-x1,y2-y1]) | ~cli.apply(p5.rect) | cli.deref() # Ocr
res | cli.cut(0, 1) | ~cli.apply(lambda x1,y1,x2,y2: [min(x1,x2), h-max(y1,y2)], 0) | ~cli.apply(lambda xy,s: [s,*xy]) | ~cli.apply(p5.text) | cli.deref() # Ocr
im2 = p5.img(); alpha = 0.3; return [img, im2] | cli.apply(cli.toTensor() | cli.op()[:3]) | ~cli.aS(lambda x,y: x*alpha+y*(1-alpha)) | cli.op().to(int) | cli.op().permute(1, 2, 0) | cli.toImg() # Ocr
def _repr_html_(self): s = html.escape(f"{self}"); return f"<pre>{s}</pre><img src='data:image/jpeg;base64, {base64.b64encode(self._overlay() | cli.toBytes()).decode()}' />" # Ocr
def __getitem__(self, s): # Ocr
if isinstance(s, slice): return [OcrBox(self, i) for i in range(len(self.res))[s]] # Ocr
return OcrBox(self, s) # Ocr
def __len__(self): return len(self.res) # Ocr
def __getstate__(self): d = {**self.__dict__}; d["img"] = self.img | cli.toBytes(); d["_npImg"] = None; return d # better compression due to converting to jpg # Ocr
def __setstate__(self, d): self.__dict__.update(d); self.img = self.img | cli.toImg(); self._npImg = None # Ocr
[docs]class OcrBox: # OcrBox
[docs] def __init__(self, ocr, i): # OcrBox
"""1 bounding box of the ocr-ed image. Not intended to be instantiated by the end user.
ocr = someImg | kapi.ocr()
ocrBox = ocr[3] # grabs the 3rd detected bounding box
See also: :class:`Ocr`""" # OcrBox
self.ocr = ocr; self.i = i # OcrBox
@property # OcrBox
def coords(self): return self.ocr.res[self.i][0] # OcrBox
@property # OcrBox
def text(self): return self.ocr.res[self.i][1] # OcrBox
@property # OcrBox
def confidence(self): return self.ocr.res[self.i][2] # OcrBox
def _toNdArray(self): # OcrBox
x1,y1,x2,y2 = self.ocr.res[self.i][0] # OcrBox
return self.ocr.npImg()[:,y1:y2,x1:x2] # OcrBox
def _toImg(self, **kwargs): return self._toNdArray().transpose((1, 2, 0)) | cli.toImg() # OcrBox
def __repr__(self): return f"<OcrBox i={self.i} coords={self.coords} confidence={round(self.confidence, 3)} text='{self.text}' />" # OcrBox
def _repr_html_(self): s = html.escape(f"{self}"); return f"<pre>{s}</pre><img src='data:image/jpeg;base64, {base64.b64encode(self | cli.toImg() | cli.toBytes()).decode()}' />" # OcrBox
init.addAtomic(Ocr); init.addAtomic(OcrBox) # OcrBox
[docs]class tess(BaseCli): # tess
[docs] def __init__(self): # tess
"""Do OCR using tesseract, instead of easyocr. This is meant for simple images only,
preferably sections cut off from :class:`ocr`. For complex bounding box detection, still
use :class:`ocr`. Example::
# returns "some text"
image | kapi.tess()
For small texts, can reach throughput up to 75/s""" # tess
pass # tess
[docs] def __ror__(self, it): return self._all_opt([it]) | cli.item() # tess
def _all_opt(self, it:"list[str]"): # tess
for b in it | cli.batched(75, True): # tess
yield from get("kapi/12-tess", {"imgs": k1lib.encode(b)}) # tess
[docs]class txt2im(BaseCli): # txt2im
[docs] def __init__(self, num_inference_steps=10): # txt2im
"""Generates images from text descriptions, using stable diffusion v2.
"a bowl of apples" | kapi.txt2im() # returns PIL image
- VRAM: 5.42GB
- Throughput: 1/s
""" # txt2im
self.num_inference_steps = num_inference_steps # txt2im
[docs] def __ror__(self, it): return get("kapi/4-txt2im", {"prompt": it, "num_inference_steps": self.num_inference_steps}) | cli.aS(base64.b64decode) | cli.toImg() # txt2im
[docs]class caption(BaseCli): # caption
[docs] def __init__(self): # caption
"""Captions images using model `Salesforce/blip-image-captioning-large`.
img = "some_image.png" | toImg() # loads PIL image
img | kapi.caption() # returns string description
- VRAM: 2.5GB
- Throughput: 16/s
""" # caption
pass # caption
[docs] def __ror__(self, it): return self._all_opt([it]) | cli.item() # caption
def _all_opt(self, it:"list[PIL.Image.Image]"): return it | cli.batched(5, True) | cli.apply(lambda imgs: get("kapi/5-caption", {"images": k1lib.encode(imgs)}) | cli.aS(k1lib.decode)) | cli.joinSt() # caption
[docs]class speech(BaseCli): # speech
[docs] def __init__(self, sep=False): # speech
"""Converts English speech to text using whisper-large-v2.
"audio.mp3" | toAudio() | kapi.speech() # returns string transcript
- Throughput: 20min video finish transcribing in ~25s, so around 60x faster than real time
If the input audio is too long (>25 minutes), then it will be broken up
into multiple smaller pieces around 20 min each and sent to the server,
so at the bounds, it might go wrong a little bit
:param sep: if True, separate transcripts of each segment (returns List[transcript]),
if False (default), joins segment's transcripts together into a single string""" # speech
self.sep = sep # speech
[docs] def __ror__(self, audio:"conv.Audio"): # speech
nSplits = int(audio.raw.duration_seconds/60/25)+1 # speech
res = audio | cli.splitW(*[1]*nSplits) | cli.apply(lambda piece: get("kapi/6-speech", {"audio": base64.b64encode(piece | cli.toBytes()).decode()})) # speech
return list(res) if self.sep else res | cli.join(". ") # speech
def _summarize(text:str) -> str: # _summarize
return text | segment(2000) | complete("<|end of transcript|>\n\nPlease summarize the above transcript using 1-3 sentences: ").all()\
| cli.op().strip().all() | cli.deref() | cli.join(". ") # _summarize
[docs]class summarize(BaseCli): # summarize
[docs] def __init__(self, length=1000): # summarize
"""Summarizes text in multiple stages until it's shorter than ``length`` in
characters or until further compression is not possible. Example::
url = "https://www.youtube.com/watch?v=NfmSjGbnEWk"
audio = url | toAudio() # downloads audio from youtube
text = audio | kapi.speech() # does speech recognition
text | summarize() # summarize the text. For a 23 minute video/22k characters text, it should take around 23s to summarize everything
This will return an array of strings::
"shortened text final stage",
"shortened text stage 2",
"shortened text stage 1",
"original text",
So in each stage, the original text is split up into multiple pieces, then
each piece is summarized using :class:`complete` and then all summary will
be joined together, creating the "shortened text stage 1". This continues
until it the text's length does not decrease any further, or it's shorter
than the desired length.
:param length: desired summary string length""" # summarize
self.length = length # summarize
[docs] def __ror__(self, text:str): # summarize
stages = [text]; l = len(text) # summarize
while True: # summarize
if len(text) < self.length: return stages | cli.reverse() | cli.deref() # summarize
l = len(text); text = _summarize(text); stages.append(text) # summarize
if len(text)/l > 0.8: return stages | cli.reverse() | cli.deref() # if length not shrinking, then just return early # summarize
[docs]class post(BaseCli): # post
[docs] def __init__(self, url): # post
"""Creates a post request from a URL that can be read using :meth:`~k1lib.cli.inp.cat`.
# returns str of the results
{"some": "json data"} | kapi.post("https://some.url/some/path")
Notice how there isn't a get request counterpart, because you can always just cat() them
directly, as get requests don't have a body::
""" # post
self.url = url # post
[docs] def __ror__(self, d): return requests.post(self.url, json=d).text # post
def _jsF(self, meta): # post
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # post
return f"""\
{fIdx} = async ({dataIdx}) => {{
const res = await fetch({json.dumps(self.url)}, {{ method: "POST", headers: {{ "Content-Type": "application/json" }}, body: JSON.stringify({dataIdx}) }});
if (res.ok) return await res.text();
throw new Error(`Can't send POST request to '{self.url}': ${{res.status}} - ${{res.statusText}}`);
}}""", fIdx # post