# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""This module for tools that will likely start the processing stream."""
from typing import Iterator, Union, Any, List
import k1lib, urllib, subprocess, warnings, os, k1lib, threading, time, warnings, math, io, dill, urllib, validators
from collections import deque
from k1lib.cli import BaseCli, init; import k1lib.cli as cli
from k1lib.cli.typehint import *
from contextlib import contextmanager
requests = k1lib.dep.requests
try: import minio; hasMinio = True
except: hasMinio = False
__all__ = ["cat", "catPickle", "splitSeek", "refineSeek", "wget", "ls", "cmd", "walk", "urlPath", "kzip", "kunzip", "unzip"]
settings = k1lib.settings.cli
class NoPartialContent(Exception): pass # NoPartialContent
def getChunk(url:str, sB:int, eB:int, timeout:float, retries:int) -> bytes: # start inclusive, end exclusive # getChunk
for i in range(retries): # getChunk
try: res = requests.get(url, headers={"Range": f"bytes={sB}-{eB-1}"}, timeout=timeout) # getChunk
except Exception as e: # getChunk
if i >= retries-1: raise Exception(f"Can't get file chunk") # getChunk
continue # getChunk
if res.status_code != 206: raise NoPartialContent(f"Server doesn't allow partial downloads at this particular url. Status code: {res.status_code}") # getChunk
return res.content # getChunk
def getChunks(url:str, sB:int, eB:int, chunkSize=None, chunkTimeout:float=10, chunkRetries:int=10) -> List[bytes]: # getChunks
"""Grabs bytes from sB to eB in chunks""" # getChunks
chunkSize = chunkSize or settings.cli.cat.chunkSize # getChunks
return range(sB, eB+1) | cli.batched(chunkSize, True) | cli.apply(lambda r: getChunk(url, r.start, r.stop-1, chunkTimeout, chunkRetries)) # getChunks
catSettings = k1lib.Settings().add("chunkSize", 100000, "file reading chunk size for binary+chunk mode. Decrease it to avoid wasting memory and increase it to avoid disk latency") # getChunks
catSettings.add("every", k1lib.Settings().add("text", 1000, "for text mode, will print every n lines").add("binary", 10, "for binary mode, will print every n 100000-byte blocks"), "profiler print frequency") # getChunks
settings.add("cat", catSettings, "inp.cat() settings") # getChunks
# getChunks
rfS = k1lib.Settings() # getChunks
settings.add("RemoteFile", rfS, "inp.RemoteFile() settings, used in cat(), splitSeek() and the like") # getChunks
rfS.add("memoryLimit", 100_000_000, "if the internal cache exceeds this limit (in bytes), and randomAccess is False, then old downloaded chunks will be deleted") # getChunks
rfS.add("timeout", 10, "seconds before terminating the remote request and retrying") # getChunks
rfS.add("retries", 10, "how many times to retry sending the request before giving up") # getChunks
def noPartial(url, *args): # noPartial
try: return len(getChunk(url, 0, 10, *args)) != 10 # noPartial
except NoPartialContent: return True # noPartial
class RemoteFile: # RemoteFile
def __init__(self, url, randomAccess=True, blockSize=None, noPartialConfirm=False, timeout:float=None, retries:int=None): # RemoteFile
"""
:param url: url of the remote file
:param randomAccess: is random accessing parts of the file expected? If
True, then keeps all of the reads in ram internally, else free them
as soon as possible
:param blockSize: all reads will fetch roughly this amount of bytes""" # RemoteFile
self.url = url; self.randomAccess = randomAccess; self.blockSize = blockSize or settings.cat.chunkSize # RemoteFile
self.noPartialConfirm = noPartialConfirm; self.size = None; self.domain = k1lib.Domain() # RemoteFile
self.seekPos = 0; self.reads = deque() # List[sB, eB, content] # RemoteFile
self._confirmMsgShown = False; self.timeout = timeout or rfS.timeout; self.retries = retries or rfS.retries # RemoteFile
self._totalReadSize = 0; self.noPartial = noPartial(url, self.timeout, self.retries) # RemoteFile
def _fetch(self, sB:int, eB:int): # fetches from start to end byte and dumps to internal memory. Inclusive start and end byte # RemoteFile
if not self.noPartial: # RemoteFile
eB = max(eB, min(sB+self.blockSize, len(self))) # RemoteFile
chunk = getChunk(self.url, sB, eB, self.timeout, self.retries) # RemoteFile
else: # RemoteFile
if self.noPartialConfirm and not self._confirmMsgShown: # RemoteFile
ans = input(f"""Remote file '{self.url}' don't support partial downloads.
Therefore the entire file will be loaded into RAM, which
could be undesireable. Do you want to continue? Y/n: """) # RemoteFile
self._confirmMsgShown = True # RemoteFile
if ans.lower()[0] != "y": self.reads.append([0, 0, b""]); return # RemoteFile
sB = 0; chunk = requests.get(self.url).content; eB = len(chunk) # RemoteFile
self.reads.append([sB, eB, chunk]) # RemoteFile
self._totalReadSize += len(chunk); self.domain = self.domain + k1lib.Domain([sB, eB]) # RemoteFile
if not self.randomAccess and self._totalReadSize > rfS.memoryLimit: # deletes old reads # RemoteFile
sB, eB, chunk = self.reads.popleft() # RemoteFile
self._totalReadSize -= len(chunk) # RemoteFile
def _ensureRange(self, sB, eB): # makes sure that all ranges between sB and eB are available # RemoteFile
missingDomain = k1lib.Domain([max(sB-30, 0), min(eB+30, len(self))]) & -self.domain # RemoteFile
for sB, eB in missingDomain.ranges: self._fetch(sB, eB) # RemoteFile
def _readChunks(self, sB, eB): # read from sB to eB, but in chunks, to be optimized. inclusive sB, exclusive eB # RemoteFile
sB = max(min(sB, len(self)), 0); eB = max(min(eB, len(self)), 0); self._ensureRange(sB, eB) # RemoteFile
return self.reads | cli.filt(~cli.aS(lambda s,e,chunk: e>=sB and s<=eB)) | cli.sort() | ~cli.apply(lambda s,e,chunk: chunk[max(sB-s, 0):len(chunk)+min(eB-e, 0)]) | cli.filt(len) # RemoteFile
def seek(self, cookie, whence=0): # RemoteFile
if whence == 0: self.seekPos = cookie # RemoteFile
elif whence == 1: self.seekPos += cookie # RemoteFile
elif whence == 2: self.seekPos = len(self) + cookie # RemoteFile
else: raise Exception("Invalid whence") # RemoteFile
return self.seekPos # RemoteFile
def read(self, size, join=True): # RemoteFile
chunks = self._readChunks(self.seekPos, self.seekPos + size) # RemoteFile
self.seekPos += size; return b"".join(chunks) if join else chunks # RemoteFile
def readline(self, newLine=True): # RemoteFile
ans = []; seekPos = self.seekPos # RemoteFile
try: # RemoteFile
while self.seekPos < len(self): # RemoteFile
for chunk in self.read(self.blockSize, False): # RemoteFile
if len(chunk) == 0: raise SyntaxError() # RemoteFile
ans.append(chunk) # RemoteFile
if b"\n" in chunk: raise SyntaxError() # RemoteFile
except SyntaxError: pass # RemoteFile
ans = b"".join(ans) # RemoteFile
try: n = ans.index(b"\n") # RemoteFile
except ValueError: n = len(ans) # only happens at end of file # RemoteFile
self.seekPos = seekPos + n+1; return (ans[:n+1] if newLine else ans[:n]).decode() # RemoteFile
def readlines(self, newLine=True): # RemoteFile
while True: # RemoteFile
yield self.readline(newLine) # RemoteFile
if self.seekPos >= len(self): break # RemoteFile
def tell(self): return self.seekPos # RemoteFile
def _getSize(self): # RemoteFile
if self.noPartial: self._fetch(0, 10); return self.reads[0][1] # RemoteFile
for i in range(self.retries): # RemoteFile
try: return requests.head(self.url, timeout=self.timeout).headers.items() | cli.apply(cli.op().lower(), 0) | cli.toDict() | cli.op()["content-length"].ab_int() # RemoteFile
except Exception as e: # RemoteFile
if i >= self.retries: raise Exception(f"Can't get size of remote file: {e}") # RemoteFile
def __len__(self): # RemoteFile
if self.size is None: self.size = self._getSize() # RemoteFile
return self.size # RemoteFile
def __repr__(self): return f"<RemoteFile url={self.url} size={k1lib.fmt.size(len(self))}>" # RemoteFile
import zipfile, inspect # RemoteFile
class ZipWrapper: # ZipWrapper
def __init__(self, a, zfn): self.a = a; self.zfn = zfn # ZipWrapper
def __repr__(self): # ZipWrapper
a = self.a; s = f" ({round(a.compress_size/a.file_size*100)}%)" if a.file_size > 0 else "" # ZipWrapper
return f"<Zip subfile name='{a.filename}' {k1lib.fmt.size(a.file_size)} -> {k1lib.fmt.size(a.compress_size)}{s}>" # ZipWrapper
@property # ZipWrapper
def size(self): return self.a.file_size # ZipWrapper
@property # ZipWrapper
def compressedSize(self): return self.a.compress_size # ZipWrapper
def _catHandle(self): # ZipWrapper
with zipfile.ZipFile(self.zfn) as zipf: # ZipWrapper
with zipf.open(self.a.filename) as subfile: # ZipWrapper
yield subfile # ZipWrapper
@contextmanager # ZipWrapper
def openFile(fn, text, noPartialConfirm=False): # can be actual file or url # openFile
if not isinstance(fn, str): # openFile
if hasattr(fn, "_catHandle"): yield from fn._catHandle(); return # custom datatype case # openFile
else: yield fn; return # file handle case, just return itself # openFile
if os.path.exists(fn): # openFile
if text: # openFile
with open(fn, "r", settings.cat.chunkSize) as f: yield f # openFile
else: # openFile
with open(fn, "rb", settings.cat.chunkSize) as f: yield f # openFile
elif validators.url(fn) is True: # openFile
yield RemoteFile(fn, False, noPartialConfirm=noPartialConfirm) # openFile
else: raise FileNotFoundError(f"The file {fn} doesn't seem to exist and/or it's not a valid url") # openFile
def _catGenText(fn, sB, eB): # fn for "file name" # _catGenText
try: # _catGenText
if sB == 0 and eB == -1: # fast path without bounds (90-160 MB/s expected) # _catGenText
with openFile(fn, True) as f: # _catGenText
line = f.readline() # _catGenText
if isinstance(line, str): # why put the if outside the loop? Speed reasons. Also, isn't .readline() supposed to return a string? Well, cause we're supporting random file handles from custom datatypes, some asshole file handles might return bytes instead # _catGenText
while True: # _catGenText
if line == "": return # _catGenText
yield line[:-1] if line[-1] == "\n" else line # _catGenText
line = f.readline() # _catGenText
else: # _catGenText
while True: # _catGenText
line = line.decode() # _catGenText
if line == "": return # _catGenText
yield line[:-1] if line[-1] == "\n" else line # _catGenText
line = f.readline() # _catGenText
else: # slow path with bounds (15 MB/s expected). Update: much faster now, expect only 40% slower than the path above # _catGenText
sB = wrap(fn, sB); eB = wrap(fn, eB) # _catGenText
with openFile(fn, True) as f: # _catGenText
f.seek(sB); b = sB # current byte # _catGenText
line = f.readline() # _catGenText
if isinstance(line, str): # _catGenText
while True: # _catGenText
b += len(line) # _catGenText
if len(line) == 0: return # _catGenText
if b > eB: yield line[:len(line)-(b-eB)]; return # _catGenText
yield line[:-1] if line[-1] == "\n" else line # _catGenText
line = f.readline() # _catGenText
else: # _catGenText
while True: # _catGenText
line = line.decode() # _catGenText
b += len(line) # _catGenText
if len(line) == 0: return # _catGenText
if b > eB: yield line[:len(line)-(b-eB)]; return # _catGenText
yield line[:-1] if line[-1] == "\n" else line # _catGenText
line = f.readline() # _catGenText
except FileNotFoundError: pass # _catGenText
def _catGenBin(fn, sB, eB): # _catGenBin
chunkSize = settings.cat.chunkSize; sB = wrap(fn, sB); eB = wrap(fn, eB); nB = eB - sB # number of bytes to read total # _catGenBin
with openFile(fn, False) as f: # _catGenBin
f.seek(sB); nChunks = math.ceil(nB / chunkSize); lastChunkSize = nB - chunkSize*(nChunks-1) # _catGenBin
# had to do this because RemoteFile is actually not thread-safe # _catGenBin
# applyF = (lambda f_: cli.apply(f_)) if isinstance(f, RemoteFile) else (lambda f_: cli.applyTh(f_, prefetch=10)) # _catGenBin
applyF = (lambda f_: cli.apply(f_)) # actually, normal reads are not thread-safe either. Stupid me # _catGenBin
yield from range(nChunks) | applyF(lambda i: f.read(chunkSize) if i < nChunks-1 else f.read(chunkSize)[:lastChunkSize]) # _catGenBin
def fileLength(fn): # fileLength
with openFile(fn, False) as f: return f.seek(0, os.SEEK_END) # fileLength
def wrap(fn, b): return b if b >= 0 else b + fileLength(fn) + 1 # wrap
class _cat(BaseCli): # _cat
def __init__(self, text, chunks, sB, eB): # _cat
super().__init__(capture=True) # _cat
self.text = text; self.chunks = chunks; self.sB = sB; self.eB = eB # _cat
def _typehint(self, ignored=None): # _cat
if self.text: return tIter(str) if self.chunks else tList(str) # _cat
else: return tIter(bytes) if self.chunks else bytes # _cat
def __ror__(self, fn:Union[str, "fileHandle"]) -> Union[Iterator[str], bytes]: # _cat
ser = self.capturedSerial; text = self.text; chunks = self.chunks; sB = self.sB; eB = self.eB # _cat
if not isinstance(fn, str) and hasattr(fn, "_cat"): # custom datatype, _cat method defined, so it will take control of things # _cat
kwargs = {"text": text, "chunks": chunks, "sB": sB, "eB": eB} # _cat
kwdiff = [a for a, b in [["text",text!=True], ["chunks",chunks!=True if text else chunks!=False], ["sB",sB!=0], ["eB",eB!=-1]] if b] # _cat
args = inspect.getfullargspec(fn._cat).args[1:]; n = len(args) # _cat
s = set(["ser", "kwargs"]); weirdArgs = [a for a in args if a not in s] # _cat
if len(weirdArgs) > 0: raise Exception(f"Custom datatype `{type(fn)}` has ._cat() method, which expects only `ser` and `kwargs` arguments, but detected these arguments instead: {weirdArgs}. Please fix `{type(fn)}`") # _cat
def guard(): # _cat
if kwdiff: raise Exception(f"Custom datatype `{type(fn)}` does not support custom cat() arguments like {kwdiff}") # _cat
if n == 0: guard(); return fn._cat() | ser # _cat
elif n == 1: # _cat
if args[0] == "ser": guard(); return fn._cat(ser) # _cat
if args[0] == "kwargs": return fn._cat(kwargs) | ser # _cat
elif n == 2: # _cat
if args[0] == "ser": return fn._cat(ser, kwargs) # _cat
else: return fn._cat(kwargs, ser) # _cat
else: raise Exception("Unreachable") # _cat
fn = os.path.expanduser(fn) if isinstance(fn, str) else fn # _cat
if text and chunks and k1lib._settings.packages.k1a and isinstance(fn, str) and os.path.exists(fn): # _cat
return k1lib._k1a.k1a.StrIterCat(fn, sB, eB) | ser # accelerated C version # _cat
if chunks: return (_catGenText(fn, sB, eB) | ser) if text else (_catGenBin(fn, sB, eB) | ser) # _cat
sB = wrap(fn, sB); eB = wrap(fn, eB) # _cat
if text: # _cat
with openFile(fn, True) as f: f.seek(sB); return f.read(eB-sB).splitlines() | ser # _cat
else: # _cat
with openFile(fn, False) as f: f.seek(sB); return f.read(eB-sB) | ser # _cat
def _jsF(self, meta): # _cat
"""Only supports get requests and JS objects that has async ._cat() function""" # _cat
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # _cat
header, _fIdx, _async = k1lib.kast.asyncGuard(self.capturedSerial._jsF(meta)) # _cat
return f"""\
{header}
{fIdx} = async ({dataIdx}) => {{
if (typeof({dataIdx}) === "string") {{
const res = await fetch({dataIdx}, {{ method: "GET" }})
if (res.ok) return {'await ' if _async else ''}{_fIdx}(await res.text());
throw new Error(`Can't fetch ${{{dataIdx}}}: ${{res.status}} - ${{res.statusText}}`)
}}
return {'await ' if _async else ''}{_fIdx}(await {dataIdx}._cat());
}}""", fIdx # _cat
pass # _cat
class Profile(BaseCli): # Profile
def __init__(self, text): self.data = []; self.text = text # Profile
def __ror__(self, it): # Profile
fmt = k1lib.fmt; chars = 0; beginTime = time.time() # Profile
if self.text: # Profile
a, b, c, d, f = k1lib.ConstantPad.multi(5); every = settings.cat.every.text # Profile
for lines, e in enumerate(it): # Profile
chars += len(e) # Profile
if lines % every == 0: # every 1000 lines, print stuff out # Profile
elapsed = time.time() - beginTime#; self.data.append([lines, chars, elapsed]) # Profile
print(f"Current line: {fmt.item(lines) | a} ({fmt.item(lines/elapsed) | b} lines/s), current byte/chars: {fmt.size(chars) | c} ({fmt.size(chars/elapsed) | d}/s), elapsed: {fmt.time(elapsed) | f} ", end="\r") # Profile
yield e # Profile
else: # Profile
a, b, c = k1lib.ConstantPad.multi(3); every = settings.cat.every.binary # Profile
for i, e in enumerate(it): # Profile
chars += len(e) # Profile
if i % every == 0: # every 10 100000-byte chunks, print stuff out # Profile
elapsed = time.time() - beginTime#; self.data.append([chars, elapsed]) # Profile
print(f"Current size/chars: {fmt.size(chars) | a} ({fmt.size(chars/elapsed) | b}/s), elapsed: {fmt.time(elapsed) | c} ", end="\r") # Profile
yield e # Profile
[docs]def cat(fileName:str=None, text:bool=True, chunks:bool=None, profile:bool=False, sB=0, eB=-1): # cat
"""Reads a file line by line.
Example::
# display first 10 lines of file
cat("file.txt") | headOut()
# piping in also works
"file.txt" | cat() | headOut()
# read bytes from an image file and dumps it to another file
cat("img.png", False) | file("img2.png")
If you want to read only specific sections of the file, you can specify the
start (``sB``) and end byte (``eB``) like this::
"123456\\n89" | file("test/catTest.pth")
# returns ['3456', '8']
cat("test/catTest.pth", sB=2, eB=8) | deref()
settings.cat.context.chunkSize=3 # just for demonstration, don't do it normally
# returns [b'123', b'456', b'\\n8']
cat("test/catTest.pth", text=False, chunks=True, eB=8) | deref()
.. admonition:: Remote files
You can also read from urls directly, like this::
cat("https://k1lib.com/latest/") | deref()
For remote files like this, there are extra settings at :data:`~k1lib.settings`.cli.RemoteFile.
This will also read the file chunk by chunk if required. If the website doesn't support
partial downloads, then all of it will be downloaded and stored into ram, which may not be
desireable. The available settings are:
- timeout: seconds before killing the existing request
- retries: try to resend the request for this much before giving up
If you are working with large files and would like to read 1 file from multiple
threads/processes, then you can use this cli in conjunction with :class:`splitSeek`.
If you are dumping multiple pickled objects into a single file, you can read all
of them using :meth:`cat.pickle`, which uses :class:`catPickle` underneath.
This cli has lots of settings at :data:`~k1lib.settings`.cli.cat
See also: :meth:`ls`
.. admonition:: Custom datatype
It is possible to build objects that can interoperate with this cli,
like this::
class custom1:
def __init__(self, config=None): ...
def _cat(self): return ["abc", "def"]
custom1() | cat() # returns ["abc", "def"]
custom1() | cat() | item() # returns "abc"
custom1() | (cat() | item()) # returns "abc"
When called upon, :meth:`cat` will see that the input is not a simple string, which
will prompt it to look for ``_cat()`` method of the complex object and execute it.
By default, if the user specifies any non-default arguments like ``text=False``,
it will errors out because :meth:`cat` does not know how to handle it. Here's how
to do it right::
class custom2:
def __init__(self, config=None): ...
def _cat(self, kwargs): # default kwargs if user doesn't specify anything else is `{"text": True, "chunks": None, "sB": 0, "eB": -1}`
if kwargs["text"]: return ["abc", "def"]
else: return [b"abc", b"def"]
custom2() | cat() # returns ["abc", "def"]
custom2() | cat() | item() # returns "abc"
custom2() | (cat() | item()) # returns "abc"
custom2() | cat(text=False) # returns [b"abc", b"def"]
custom2() | cat(text=False) | item() # returns b"abc"
Here, you're saying that your function can handle non-standard arguments, so
:meth:`cat` will give you all the args. You may support only some arguments
and completely ignore others, it's all up to you. Might still be worth it to
throw some errors warning users of arguments your custom datatype does not support.
You can also capture future clis like this::
class custom3:
def __init__(self, config=None): ...
def _cat(self, ser): # "ser" stands for "serial"
if len(ser.clis) == 1 and isinstance(ser.clis[0], item):
return "123" # fancy optimization
return ["abc", "def"] | ser # default "slow" base case
custom3() | cat() # returns ["abc", "def"]
custom3() | cat() | item() # returns "abc", because cat() did not capture any clis
custom3() | (cat() | item()) # returns "123", which might be desireable, up to you
This feature is pretty advanced actually, in that you can actually do different
things based on future processing tasks. Let's say that the captured cli looks like
``cut(3) | batched(10) | rItem(3)``. This essentially means "give me elements 30
through 39 from the 4th column". With this information, you can query your custom
database for exactly those elements only, while fetching nothing else, which would be
great for performance.
You can also outright lie to the user like in the example, where if :class:`~k1lib.cli.utils.item`
is detected, it will return a completely different version ("123") while it should return
"abc" instead. The possibilities are endless. As you can probably feel, it's super hard to
actually utilize this in the right way without breaking something, as you are completely
responsible for the captured clis. Let's see another example::
class custom4:
def __init__(self, config=None): ...
def _cat(self, ser):
return ["abc", "def"]
custom4() | cat() # returns ["abc", "def"]
custom4() | cat() | item() # returns "abc"
custom4() | (cat() | item()) # returns ["abc", "def"]
Same story as ``custom3``, demonstrating that if you declare that you want to see and
manipulate ``ser``, you'll be completely responsible for it, and if you don't handle it
correctly, it will create horrible bugs. You can, of course, have access to ``ser`` and
``kwargs`` at the same time::
class custom5:
def __init__(self, config=None): ...
def _cat(self, ser, kwargs): return ["abc", "def"] | ser
If your custom datatype feels like a block device, and you don't want to rewrite all
the functionalities in :meth:`cat`, you can implement the method ``_catHandle`` that
yields the custom file handle instead::
class custom6:
def __init__(self, config=None): ...
def _catHandle(self): yield io.BytesIO(b"abc\\ndef\\n123")
class custom7:
def __init__(self, config=None): ...
def _catHandle(self): yield io.StringIO("abc\\ndef\\n123")
custom6() | cat(text=False) # returns b'abc\\ndef\\n123'
custom7() | cat() | deref() # returns ['abc', 'def', '123']
Remember that you have to yield instead of returning the file handle. This is so that
you can use ``with`` statements, and if you return the file handle, the file might be
closed by the time :meth:`cat` decides to use it.
:param fileName: if None, then return a :class:`~k1lib.cli.init.BaseCli`
that accepts a file name and outputs Iterator[str]
:param text: if True, read text file, else read binary file
:param chunks: if True then reads the file chunk by chunk, else reads the
entire file. Defaults to True in text mode and False in binary mode
:param profile: whether to profile the file reading rate or not. Can adjust
printing frequency using `settings.cli.cat.every`
:param sB: "start byte". Specify this if you want to start reading from this byte
:param eB: "end byte", exclusive. Default -1 means end of file""" # cat
if chunks is None: chunks = True if text else False # dev note: if default arguments are changed, please also change _cat()'s implementation for custom datatypes # cat
if profile and not chunks: warnings.warn(f"Can't profile reading rate when you're trying to read everything at once"); profile = False # cat
f = _cat(text, chunks, sB, eB) # cat
if profile: f = f | Profile(text) # cat
return f if fileName is None else fileName | f # cat
k1lib.settings.cli.cat.add("pickle", k1lib.Settings(), "inp.cat.pickle() settings") # cat
[docs]class catPickle(BaseCli): # catPickle
[docs] def __init__(self, pickleModule=None): # catPickle
"""Reads a file as a series of pickled objects.
Example::
"ab" | aS(dill.dumps) | file("test/catTest.pth")
"cd" | aS(dill.dumps) >> file("test/catTest.pth") # append to the file
# returns ["ab", "cd"]
cat.pickle("test/catTest.pth") | deref()
# also returns ["ab", "cd"], same style as :class:`cat`
"test/catTest.pth" | cat.pickle() | deref()
The function ``cat.pickle`` internally uses this class, so don't worry about the discrepancy
in the examples. So this kinda relies on a trick that you can continuously dump pickled
objects to a single file and it would still work as usual::
[1, 2, 3] | aS(dill.dumps) | file("b.pth")
[4, 5] | aS(dill.dumps) >> file("b.pth")
with open("b.pth", "rb") as f:
objs = [dill.load(f), dill.load(f)]
assert objs == [[1, 2, 3], [4, 5]]
.. admonition:: tail() optimization
If you regularly append pickled objects to files, say you're trying to save
complex time series data, and you want to grab the most recent data, you
might do it like ``fn | cat.pickle() | tail(10)`` to get the last 10 objects.
But normally, this means that you have to read through the entire file in order
to just read the last bit of it. You can't exactly start close to the end as
you don't know where the object boundaries are. Luckily, I got fed up with this
so much that now when you do this: ``fn | (cat.pickle() | tail(10))`` (note the
parenthesis!), it will only read the end of the file! This optimization is mostly
airtight, but not entirely.
How it works is that cat.pickle() tries to unpickle the first object in the file
and grab the object's size in bytes. This info is used to make guesses as to where
it should start reading the file from, which is ``endByte - tail.n * object_size * 2``
Also, this optimization has only been tested on :mod:`dill` and :mod:`pickle`.
:mod:`cloudpickle` has been tested to not work (magic numbers are a little different).
I can potentially add in a mechanism for you to specify magic numbers for any pickler
you want, but it feels niche, so I'm going to skip that for now.
:param pickleModule: pickle module to use. Python's default is "pickle", but
I usually use :mod:`dill` because it's more robust""" # catPickle
super().__init__(capture=True); self.pm = pickleModule or dill # catPickle
[docs] def __ror__(self, fn): # catPickle
pm = self.pm # catPickle
def g(sB=0): # catPickle
with open(os.path.expanduser(fn), "rb") as f: # catPickle
f.seek(sB) # catPickle
try: # catPickle
while True: yield pm.load(f) # catPickle
except EOFError: pass # catPickle
if len(self.capturedClis) >= 1: # catPickle
c0 = self.capturedClis[0]; s = os.path.getsize(fn) # catPickle
if isinstance(c0, cli.tail) and s > 1e4 and isinstance(c0.n, int) and c0.n > 0: # if less than 10kB then it's not worth optimizing! # catPickle
# figures out the size of the first element to guage the relative steps we need to take! # catPickle
factor = 1.5 # catPickle
while True: # catPickle
with open(os.path.expanduser(fn), "rb") as f: # catPickle
s1 = len(pm.dumps(pm.load(f)))*factor # catPickle
arr = list(g((fn | cli.splitSeek(c=b"\x80\x04\x95", ws=[s-c0.n*s1, c0.n*s1]))[1] - 1)) # catPickle
if c0.n < len(arr): return arr | self.capturedSerial # then execute the rest of the pipeline # catPickle
factor *= 1.5 # if guess too conservatively, then try again with a bigger factor! # catPickle
return g() | self.capturedSerial # catPickle
def _catPickle(fileName=None, pickleModule=dill): # _catPickle
"""Reads a file as a series of pickled objects. See :class:`catPickle`""" # _catPickle
return catPickle(pickleModule) if fileName is None else fileName | catPickle(pickleModule) # _catPickle
cat.pickle = _catPickle # _catPickle
class splitSeekRes: # splitSeekRes
def __init__(self, fn, res): # splitSeekRes
"""Return result of splitSeek(). We don't want to return just a list alone, because
we also want to pass along information like the file name""" # splitSeekRes
self.fn = fn; self.res = res # splitSeekRes
def __getitem__(self, idx): return self.res[idx] # splitSeekRes
def __len__(self): return len(self.res) # splitSeekRes
def __iter__(self): return iter(self.res) # splitSeekRes
def __repr__(self): return self.res.__repr__() # splitSeekRes
[docs]class splitSeek(BaseCli): # splitSeek
[docs] def __init__(self, n=None, c=b'\n', ws=None): # splitSeek
"""Splits a file up into n fragments aligned to the closest character and return the seek points.
Example::
# preparing the large file
range(120) | apply(lambda x: f"{x:3}_56789") | file("test/largeFile.txt")
# returns [0, 30, 70, 110, 150, 190, 230, 270, 300, 340]
"test/largeFile.txt" | splitSeek(31) | head()
# returns 32
"test/largeFile.txt" | splitSeek(31) | shape(0)
# returns 32, demonstrating you can also pipe file objects in, if that's what you want
open("test/largeFile.txt") | splitSeek(31) | shape(0)
# returns [0, 0, 10, 10, 20, 30, 30, 40, 40, 50], notice some segments have zero length
"test/largeFile.txt" | splitSeek(200) | head()
# returns [0, 400, 1200], demonstrating that you can split a file up unevenly by weights
"test/largeFile.txt" | splitSeek(ws=[1, 2]) | deref()
So, the generated file has 120 lines in total. Each line is 10 bytes (9 for
the string, and 1 for the new line character). Splitting the file into 31
fragments will result in 32 seek points (:math:`p_i\quad i\in[1, n+1]`).
Also notice how the returned seek position is not the position of the character
themselves, but the character right after. So if we're splitting by the new line
character, then it will return the next character after the new line. This is
by default because the majority of the time, I just want it to split by lines.
But you might have other ideas on how to use this in mind. Then just subtract 1
from all returned seek points.
You can then use these seek points to read the file in multiple threads/processes
using :meth:`cat`, like this::
# returns [[' 0_56789', ' 1_56789', ' 2_56789'], [' 3_56789', ' 4_56789', ' 5_56789', ' 6_56789']]
"test/largeFile.txt" | splitSeek(31) | window(2) | ~apply(lambda sB, eB: cat("test/largeFile.txt", sB=sB, eB=eB-1)) | head(2) | deref()
Because :math:`120/31\\approx4`, most of cat's reads contain 4 lines, but some
has 3 lines. Also notice that the lines smoothly transitions between cat's
reads (``2_56789`` to ``3_56789``), so that's pretty nice. Just like with :meth:`cat`,
this also works with urls::
"https://example.com" | splitSeek(10)
.. warning::
You have to really test whether reading the same file from multiple processes is
going to be really faster or not. If your data is stored in a HDD (aka hard drive,
with spinning disks), then it will actually slow you down (10x-100x), because the
disk will have to context switch all the time, and each switch has a 10ms cost.
You also have to take into account collecting together the results of all processes,
which can bottleneck the cpu. Read more about concurrency pitfalls at :class:`~k1lib.cli.modifier.applyMp`.
In some scenarios where you want to adjust the seek points even more, like when
you want to parse FASTA genome files, which has blocks of 2/4 lines each like this:
.. code-block:: text
@FP200005993L1C001R00100000061/2
TTTTAAACTTGCATTCTTTGGAGATTTGCTGAGTGTTGCTAGAGCTGGGAAACTTTTTTAATGAGATACGTGCATATTTTTCAAATTTACAGATCTTTTTTCACAAAAATAGAAAGTCATAAATGTGAAATGGAAACCTAAACAAGGCAA
+
GFEEEDEFGFFFFEFFFFFIFCEEEFFFGFFDFEGEDGFFFGDGFFGDGCE@GGGEEFDFGFGCFDFGGHCHFFFGFFFFFGEFDFFGHGFGEEHGFGEGFGFHFFEGFFFE;GEGEFGGHFFEI=GDAEDIFDDFGHFGEFGFEGGGGF
@FP200005993L1C001R00100000167/2
CTGGAATTTGGTATCTTATTGCCAAAGAATCTGTTTTGTGAAACTTGGGATCTCTATTTTAATGTTAATTCTGGTCAGTTGTGCCTAAACTCCATAAAGCAGGGACTATACTGAGGCGTATTCAATCTTCCTTCTTACCAAGGCCAGGAA
+
EEFECEDEFFCGFFFFFEEEGEGFEDECCEFEFDFEEFDFEDDFEFEEFDDFFEEFFEEFEFFHEEFEEFEFFDEFFFECF>FFFEFEDFCFFFEGFEDEEGDDFEEFEFGEEBD@EG>EEFFECEEGFEEEFFEDGEEEDE5EBDG:CC
Here, each 4 lines are (title, read, blank, quality). Because by default, this will
only split neatly along new lines, you will have to write extra functions to detect
if a particular seek point is desirable, and if not, either jump forward or backward
using :meth:`splitSeek.forward` and :meth:`splitSeek.backward`. To help with this,
:class:`refineSeek` has some useful methods that you might want to check out.
:param n: how many splits do you want?
:param c: block-boundary character, usually just the new line character
:param ws: weights. If given, the splits length ratios will roughly correspond to this""" # splitSeek
self.n = n; self.c = c; self.ws = ws # splitSeek
if ws is None and n is None: raise Exception("Specify at least n or ws for splitSeek to work") # splitSeek
[docs] @staticmethod # splitSeek
def forward(f, i:int, c=b'\n') -> int: # splitSeek
"""Returns char location after the search char, going forward.
Example::
f = io.BytesIO(b"123\\n456\\n789\\nabc")
f | splitSeek(2) # returns [0, 4, 15]
splitSeek.forward(f, 2) # returns 4
splitSeek.forward(f, 3) # returns 4
splitSeek.forward(f, 4) # returns 8
:param f: file handle
:param i: current seek point
:param c: block-boundary character""" # splitSeek
def inner(f): # splitSeek
f.seek(i) # splitSeek
while True: # splitSeek
b = f.tell(); s = f.read(1000); di = s.find(c) # splitSeek
if di > -1: return b + di + 1 # splitSeek
if s == "": return -1 # splitSeek
if isinstance(f, str): # splitSeek
with openFile(os.path.expanduser(f), False) as _f: return inner(_f) # splitSeek
else: return inner(f) # splitSeek
[docs] @staticmethod # splitSeek
def backward(f, i:int, c=b'\n') -> int: # splitSeek
"""Returns char location after the search char, going backward.
Example::
f = io.BytesIO(b"123\\n456\\n789\\nabc")
f | splitSeek(2) # returns [0, 4, 15]
splitSeek.backward(f, 5) # returns 4
splitSeek.backward(f, 4) # returns 4
splitSeek.backward(f, 3) # returns 0
:param f: file handle
:param i: current seek point
:param c: block-boundary character""" # splitSeek
def inner(f): # splitSeek
mul = 1 # splitSeek
while True: # splitSeek
begin = max(i-1000*mul, 0); end = max(i-1000*(mul-1), 0); mul += 1 # search range # splitSeek
f.seek(begin); b = f.tell(); s = f.read(end-begin); di = s.rfind(c) # splitSeek
if di > -1: return b + di + 1 # splitSeek
if b == 0: return 0 # splitSeek
if isinstance(f, str): # splitSeek
with openFile(os.path.expanduser(f), False) as _f: return inner(_f) # splitSeek
else: return inner(f) # splitSeek
[docs] def __ror__(self, fn): # why return splitSeekRes instead of the seek positions as a list directly? Because we want to pass along dependencies like file name to downstream processes like refineSeek # splitSeek
if isinstance(fn, str): fn = os.path.expanduser(fn) # splitSeek
n = self.n; c = self.c; ws = self.ws # splitSeek
def func(f): # f is a file-like object # splitSeek
f.seek(0, os.SEEK_END); end = f.tell() # splitSeek
if ws is None: begins = range(n) | cli.apply(lambda x: int(x*end/n)) # splitSeek
else: begins = range(end) | cli.splitW(*ws) | cli.apply(lambda x: x.start) # splitSeek
return [*begins | cli.apply(lambda x: splitSeek.backward(f, x, c)), end] # splitSeek
if isinstance(fn, str): # splitSeek
with openFile(os.path.expanduser(fn), False, True) as f: return splitSeekRes(fn, func(f)) # splitSeek
else: return splitSeekRes(fn, func(fn)) # splitSeek
[docs]class refineSeek(BaseCli): # refineSeek
[docs] def __init__(self, f=None, window=1): # refineSeek
"""Refines seek positions.
Example::
# returns list of integers for seek positions
"abc.txt" | splitSeek(30)
# returns refined seek positions, such that the line starting at the seek positions starts with "@"
"abc.txt" | splitSeek(30) | refineSeek(lambda x: x.startswith(b"@"))
# same thing as above
"abc.txt" | splitSeek(30) | refineSeek(lambda x: x[0] == b"@"[0])
# returns refined seek positions, such that 0th line starts with "@" and 2nd line starts with "+". This demonstrates `window` parameter
"abc.txt" | splitSeek(30) | refineSeek(lambda x: x[0][0] == b"@"[0] and x[2][0] == b"+"[0], 3)
# same thing as above, demonstrating some builtin refine seek functions
"abc.txt" | splitSeek(30) | refineSeek.fastq()
:param f: function that returns True if the current line/lines is a valid block boundary
:param window: by default (1), will fetch 1 line and check boundary using ``f(line)``.
If a value greater than 1 is passed (for example, 3), will fetch 3 lines and check
boundary using ``f([line1, line2, line3])``
""" # refineSeek
if f is None: f = lambda x: True # refineSeek
self.f = cli.fastF(f); self.window = window; self.fn = None # refineSeek
[docs] def __ror__(self, seeks): # refineSeek
f = self.f; window = self.window # refineSeek
def read(fio, sB:int): # refineSeek
fio.seek(sB) # refineSeek
if window == 1: return fio.readline() # refineSeek
return list(cli.repeatF(lambda: fio.readline(), window)) # refineSeek
if len(seeks) <= 2: return seeks # refineSeek
fn = self.fn or seeks.fn; newSeeks = [seeks[0]] # refineSeek
def process(fio): # refineSeek
with openFile(fn, False) as fio: # refineSeek
for seek in seeks[1:-1]: # refineSeek
line = read(fio, seek) # refineSeek
while not f(line): seek = splitSeek.forward(fio, seek); line = read(fio, seek) # refineSeek
newSeeks.append(seek) # refineSeek
newSeeks.append(seeks[-1]); return newSeeks # refineSeek
if isinstance(fn, str): # refineSeek
with openFile(fn, False) as fio: return process(fio) # refineSeek
else: return process(fn) # refineSeek
[docs] def injectFn(self, fn): # refineSeek
"""Injects file name dependency, if this is not used right after :class:`splitSeek`""" # refineSeek
self.fn = fn; return self # refineSeek
[docs] @classmethod # refineSeek
def fastq(cls): # refineSeek
"""Refine fastq file's seek points""" # refineSeek
return cls(lambda x: x[0][0] == b"@"[0] and x[2][0] == b"+"[0], 3) # refineSeek
[docs]def wget(url:str, fileName:str=None, mkdir=True): # wget
"""Downloads a file. Also returns the file name, in case you want to pipe it
to something else.
:param url: The url of the file
:param fileName: if None, then tries to infer it from the url
:param mkdir: whether to make the directory leading up to the file or not""" # wget
if fileName is None: fileName = url.split("/")[-1] # wget
fileName = os.path.expanduser(fileName); dirname = os.path.dirname(fileName) # wget
if mkdir: os.makedirs(dirname, exist_ok=True) # wget
try: urllib.request.urlretrieve(url, fileName); return fileName # wget
except: return None # wget
[docs]def ls(folder:str=None): # ls
"""List every file and folder inside the specified folder.
Example::
# returns List[str]
ls("/home")
# same as above
"/home" | ls()
# only outputs files, not folders
ls("/home") | filt(os.path.isfile)
This can handle things that are not plain folders. For example,
it can handle zip file whereby it will list out all the files contained
within a particular .zip::
ls("abc.zip")
Then, you can use :meth:`cat` as usual, like this::
ls("abc.zip") | item() | cat()
Pretty nice!
.. admonition:: Custom datatype
It is possible to build objects that can interoperate with this cli,
like this::
class sql:
def __init__(self): ...
def _ls(self): return ["something", "here"]
ls() will identify that what's inputted is not a string, and will try
to execute the object's "_ls()" method, so you can just simply implement
it in your classes
""" # ls
if folder is None: return _ls() # ls
else: return folder | _ls() # ls
class _ls(BaseCli): # _ls
def _typehint(self, ignored=None): return tList(str) # _ls
def __ror__(self, path:str): # _ls
if isinstance(path, str): # _ls
path = os.path.expanduser(path.rstrip(os.sep)) # _ls
if os.path.exists(path): # _ls
if os.path.isfile(path): # _ls
if cat(path, False, eB=2) == b"PK": # list subfiles in a zip file # _ls
return [ZipWrapper(e, path) for e in zipfile.ZipFile(path).infolist()] # _ls
else: raise Exception(f"{path} is a file, not a folder, so can't list child directories") # _ls
else: return [f"{path}{os.sep}{e}" for e in os.listdir(path)] # _ls
else: return [] # _ls
else: return path._ls() # _ls
k1lib.settings.cli.add("quiet", False, "whether to mute extra outputs from clis or not") # _ls
newline = b'\n'[0] # _ls
class lazySt: # lazySt
def __init__(self, st, text:bool): # lazySt
"""Converts byte stream into lazy text/byte stream, with nice __repr__.""" # lazySt
self.st = st; self.text = text; # lazySt
def __iter__(self): # lazySt
if self.text: # lazySt
while True: # lazySt
line = self.st.readline() # lazySt
if len(line) == 0: break # lazySt
yield line.decode().rstrip("\n") # lazySt
else: # lazySt
while True: # lazySt
line = self.st.readline() # lazySt
if len(line) == 0: break # lazySt
yield line # lazySt
def __repr__(self): self | cli.stdout(); return "" # lazySt
def executeCmd(cmd:str, inp:bytes, text): # executeCmd
"""Runs a command, and returns stdout and stderr streams""" # executeCmd
p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=k1lib.settings.wd) # executeCmd
if inp is not None: # executeCmd
if isinstance(inp, (str, bytes)): p.stdin.write(inp if isinstance(inp, bytes) else inp.encode()) # executeCmd
else: # executeCmd
for e in inp: # executeCmd
if not isinstance(e, (str, bytes)): e = str(e) # executeCmd
if not isinstance(e, bytes): e = e.encode() # executeCmd
p.stdin.write(e); p.stdin.write(b"\n") # executeCmd
p.stdin.close(); return p, lazySt(p.stdout, text), lazySt(p.stderr, text) # executeCmd
def printStderr(err): # printStderr
if not k1lib.settings.cli.quiet: # printStderr
e, it = err | cli.peek() # printStderr
if it != []: it | cli.insert("\nError encountered:\n") | cli.apply(k1lib.fmt.txt.red) | cli.stdout() # printStderr
def requireCli(cliTool:str): # requireCli
"""Searches for a particular cli tool (eg. "ls"), throws ImportError if not
found, else do nothing""" # requireCli
a = cmd(cliTool); None | a; # requireCli
if len(a.err) > 0: raise ImportError(f"""Can't find cli tool {cliTool}. Please install it first.""") # requireCli
[docs]class cmd(BaseCli): # cmd
_forked_num_mode3 = 0 # cmd
[docs] def __init__(self, cmd:str, mode:int=1, text=True, block=False): # 0: return (stdout, stderr). 1: return stdout, 2: return stderr # cmd
"""Runs a command, and returns the output line by line. Can pipe in some
inputs. If no inputs then have to pipe in :data:`None`. Example::
# return detailed list of files
None | cmd("ls -la")
# return list of files that ends with "ipynb"
None | cmd("ls -la") | cmd('grep ipynb$')
It might be tiresome to pipe in :data:`None` all the time. So, you can use ">"
operator to yield values right away::
# prints out first 10 lines of list of files
cmd("ls -la") > headOut()
If you're using Jupyter notebook/lab, then if you were to display a :class:`cmd`
object, it will print out the outputs. So, a single command ``cmd("mkdir")``
displayed at the end of a cell is enough to trigger creating the directory.
Reminder that ">" operator in here sort of has a different meaning to that of
:class:`~k1lib.cli.init.BaseCli`. So you kinda have to becareful about this::
# returns a serial cli, cmd not executed
cmd("ls -la") | deref()
# executes cmd with no input stream and pipes output to deref
cmd("ls -la") > deref()
# returns a serial cli
cmd("ls -la") > grep("txt") > headOut()
# executes pipeline
cmd("ls -la") > grep("txt") | headOut()
General advice is, right ater a :class:`cmd`, use ">", and use "|" everywhere else.
Let's see a few more exotic examples. File ``a.sh``:
.. code-block:: bash
#!/bin/bash
echo 1; sleep 0.5
echo This message goes to stderr >&2
echo 2; sleep 0.5
echo $(</dev/stdin)
sleep 0.5; echo 3
Examples::
# returns [b'1\\n', b'2\\n', b'45\\n', b'3\\n'] and prints out the error message
"45" | cmd("./a.sh", text=False) | deref()
# returns [b'This message goes to stderr\\n']
"45" | cmd("./a.sh", mode=2, text=False) | deref()
# returns [[b'1\\n', b'2\\n', b'45\\n', b'3\\n'], [b'This message goes to stderr\\n']]
"45" | cmd("./a.sh", mode=0, text=False) | deref()
Performance-wise, stdout and stderr will yield values right away as soon
as the process outputs it, so you get real time feedback. However, this will
convert the entire input into a :class:`bytes` object, and not feed it bit by
bit lazily, so if you have a humongous input, it might slow you down a little.
Also, because stdout and stderr yield values right away, it means that if you
want the operation to be blocking until finished, you have to consume the output::
None | cmd("mkdir abc")
# might fail, because this might get executed before the previous line
None | cmd("echo a>abc/rg.txt")
None | cmd("mkdir abc") | ignore()
# will succeed, because this will be guaranteed to execute after the previous line
None | cmd("echo a>abc/rg.txt")
Settings:
- cli.quiet: if True, won't display errors in mode 1
:param mode: if 0, returns ``(stdout, stderr)``. If 1, returns ``stdout`` and prints
``stderr`` if there are any errors. If 2, returns ``stderr``. If 3, daemonize
the process, returns nothing
:param text: whether to decode the outputs into :class:`str` or return raw :class:`bytes`
:param block: whether to wait for the task to finish before returning to Python or not""" # cmd
super().__init__(); self.cmd = cmd; self.mode = mode # cmd
self.text = text; self.block = block; self.ro = k1lib.RunOnce() # cmd
def _typehint(self, ignored=None): # cmd
t = tIter(str) if self.text else tIter(bytes) # cmd
if self.mode == 0: return tCollection(t, t) # cmd
return t # cmd
[docs] def __ror__(self, it:Union[None, str, bytes, Iterator[Any]]) -> Iterator[Union[str, bytes]]: # cmd
"""Pipes in lines of input, or if there's nothing to
pass, then pass None""" # cmd
if self.mode == 3: # this need so much time to get right, damn # cmd
if it != None: raise Exception("cmd() has .mode = 3, which means it's executed as daemon, so it should not take in any input from stdin. Do `None | cmd(..., mode=3)` instead") # cmd
cwd = k1lib.settings.wd # cmd
if os.fork() == 0: # cmd
with k1lib.ignoreWarnings(): subprocess.Popen(f"{self.cmd} >/dev/null 2>&1", shell=True, cwd=cwd, close_fds=True, preexec_fn=os.setpgrp).wait()#; subprocess.Popen(f'disown {p.pid}', shell=True) # cmd
else: cmd._forked_num_mode3 += 1; return None # cmd
else: # cmd
if not self.ro.done(): mode = self.mode; self.p, self.out, self.err = executeCmd(self.cmd, it, self.text) # cmd
if self.block: self.out = self.out | cli.deref(); self.err = self.err | cli.deref() # cmd
if mode == 0: return (self.out, self.err) # cmd
elif mode == 1: threading.Thread(target=lambda: printStderr(self.err)).start(); return self.out # cmd
elif mode == 2: return self.err # cmd
def __gt__(self, it): return None | self | it # cmd
def __repr__(self): return (None | self).__repr__() # cmd
[docs]class walk(BaseCli): # walk
[docs] def __init__(self, **kwargs): # walk
"""Recursively get all files inside a dictionary.
Example::
# prints out first 10 files
"." | walk() | headOut()""" # walk
self.kwargs = kwargs # walk
[docs] def __ror__(self, path): # walk
return os.walk(path, **self.kwargs) | ~cli.apply(lambda x, y, z: z | cli.apply(lambda e: x + os.sep + e)) | cli.joinStreams() # walk
[docs]def urlPath(base:str, host:bool=True): # urlPath
"""Translates from a url to a file path.
Example::
base = "~/ssd/some/other/path"
url = "http://example.com/some/path/to/file.txt"
url | urlPath(base) # returns "~/ssd/some/other/path/example_com/some/path/to/file.txt"
url | urlPath(base, False) # returns "~/ssd/some/other/path/some/path/to/file.txt"
:param base: base directory you want the files to be in""" # urlPath
base = base.rstrip("/\\") # urlPath
def inner(url): # urlPath
p = urllib.parse.urlparse(url) # urlPath
a = (p.netloc.replace(".", "_") + os.sep) if host else "" # urlPath
return f"{base}{os.sep}{a}{p.path.strip('/')}".replace("//", "/") # urlPath
return cli.aS(inner) # urlPath
import bz2, gzip, zlib # urlPath
[docs]class kzip(BaseCli): # kzip
[docs] def __init__(self, fmt="gz"): # kzip
"""Incrementally compresses a stream of data using gzip or bzip2.
Example::
data = range(100) | apply(lambda x: str(x).encode()) | deref() # list of bytes
data | kzip() | deref() # returns list of bytes
range(100) | apply(str) | kzip() | deref() # returns list of bytes, demonstrating piping iterator of string works
Quick reminder that if you pass in bytes iterator then it will be compressed as-is.
But if you pass in string iterator then it will append a new line character to each
string and then compress it. This is more intuitive, and it makes the style consistent
with :class:`~k1lib.cli.output.file`.
Passing in single string or byte is ok too::
"0123456789" | kzip() # returns gz bytes
b"0123456789" | kzip() # returns gz bytes, exact same pattern as above
Why "kzip" and not just "zip"? Because "zip" is a builtin python keyword.
See also: :class:`kunzip`
:param fmt: desired compressed format. Currently only supports "gz" or "bz2"
""" # kzip
fmt = fmt.strip(".") # kzip
if fmt == "gzip" or fmt == "gz": # kzip
self.o = zlib.compressobj(wbits=31) # kzip
elif fmt == "bzip2" or fmt == "bz2": # kzip
self.o = bz2.BZ2Compressor() # kzip
else: raise Exception(f"File type {fmt} not supported. Specify either 'gz' or 'bz2'") # kzip
[docs] def __ror__(self, it): # kzip
o = self.o # kzip
def gen(): # kzip
for e in it: # kzip
if isinstance(e, str): e = f"{e}\n".encode() # kzip
res = o.compress(e) # kzip
if res: yield res # kzip
try: # kzip
res = o.flush() # kzip
if res: yield res # kzip
except: pass # kzip
if isinstance(it, str): it = it.encode() # kzip
if isinstance(it, bytes): return [o.compress(it), o.flush()] | cli.filt("x") | cli.aS(b"".join) # kzip
else: return gen() # kzip
class decompressobj: # .gz or .bz2 file # decompressobj
"""
Why not just use zlib.decompressobj() directly? I encountered a strange bug when trying
to decompress CommonCrawl's gzip files, posted on Stack Overflow and got help from none
other than Mark Adler, creator of gzip and zlib. That guy's super active on Stack Overflow.
Anyway, this code here is a modified version of his code. Here's the discussion:
https://stackoverflow.com/questions/76452480/pythons-zlib-doesnt-work-on-commoncrawl-file
""" # decompressobj
def __init__(self, gz=True): self.gz = gz # decompressobj
def __ror__(self, it): # decompressobj
gz = self.gz; data = left = b''; o = zlib.decompressobj(zlib.MAX_WBITS|32) if gz else bz2.BZ2Decompressor() # decompressobj
for got in it: # decompressobj
yield o.decompress(left + got); left = b'' # decompressobj
if o.eof: left = o.unused_data; o = zlib.decompressobj(zlib.MAX_WBITS|32) if gz else bz2.BZ2Decompressor() # decompressobj
if len(got) == 0 and len(left) == 0: break # decompressobj
stream_unzip = k1lib.dep("stream_unzip", url="https://pypi.org/project/stream-unzip/") # decompressobj
class decompressobjZip: # .zip files # decompressobjZip
def __ror__(self, it): # decompressobjZip
for a, b, c in stream_unzip.stream_unzip(it): yield from c; return # decompressobjZip
[docs]class kunzip(BaseCli): # kunzip
[docs] def __init__(self, text=False): # kunzip
"""Incrementally decompress a stream of data using gzip or bzip2.
Example::
# returns an iterator of bytes. cat() command can shorten to cat("someFile.gz", False, True)
cat("someFile.gz", text=False, chunks=True) | unzip()
# incrementally fetches remote file, then incrementally unzips it
cat("https://example.com/someFile.gz", False, True) | unzip()
data = range(100) | apply(lambda x: str(x).encode()) | deref() # list of bytes
data | kzip() | unzip() | deref() # returns original data in list of bytes. May split the bytes at different positions though
How does it know which algorithm to pick to decompress? It looks at the
first few bytes of the file for its magic number. Also, if you're expecting
a text file after decompression, you can do this to get the lines directly::
# returns iterator of strings
cat("https://example.com/someFile.gz", False, True) | unzip(True)
One more thing. If you're planning to use this to download whole files, you might
want to tune the chunk size in :data:`~k1lib.settings`.cli.cat.chunkSize as
it might speed things up considerably to raise it. Or may be you should just
use wget instead =))
More examples of the different styles to use this cli::
["abc"] | kzip() | unzip(True) | deref() # returns ["abc"]
"abc" | kzip() | unzip(True) # returns "abc"
"abc" | kzip() | unzip() # returns b"abc"
See also: :class:`kzip`.
.. admonition:: Reading files directly
The original purpose of this cli is to incrementally decompress a byte stream.
But a lot of time, that byte stream is coming from a file, and typing out
``cat(fn, False, True)`` to generate a byte stream to feed into this cli is
tedious, so instead, you can pipe in the file name and this will just unzips
it and return the string/byte stream to you::
"abc.gz" | unzip() # returns string iterator
"abc.bz2" | unzip() # same
"abc.zip" | unzip() # returns string iterator from the first subfile only. All subsequent subfiles are ignored
.. admonition:: .zip files
This can also work with subfiles within .zip files, like this::
"abc.zip" | unzip() # unzips the first subfile
"abc.zip" | ls() # lists out all subfiles
"abc.zip" | ls() | rItem(2) | unzip() # unzips the 3rd subfile
:param text: whether to yield string lines or bytes""" # kunzip
self.text = text # kunzip
[docs] def __ror__(self, it): # kunzip
def gen(it): # generates a stream of bytes # kunzip
it = iter(it); e = next(it) # kunzip
if e[:2] == b"\x1f\x8b": o = decompressobj(True) # .gz # kunzip
elif e[:3] == b"BZh": o = decompressobj(False) # .bz2 # kunzip
elif e[:2] == b"PK": o = decompressobjZip() # .zip # kunzip
else: raise Exception("Can't infer the file type (whether gz or bz2) of this file") # kunzip
yield from [[e], it] | cli.joinStreams() | o # kunzip
single = False # kunzip
if isinstance(it, str): return cat(it, False, True) | kunzip(self.text) # special case for reading file names directly # kunzip
if isinstance(it, ZipWrapper): return it | cat(text=self.text, chunks=True) # special case for .zip subfile # kunzip
if isinstance(it, bytes): single = True; it = [it] # kunzip
if self.text: # kunzip
def gen2(it): # generates a stream of strings # kunzip
last = b"" # last split is probably not the right line boundary, so gotta do this # kunzip
for e in gen(it): # kunzip
e = last + e; splits = e.split(b"\n") # kunzip
for line in splits[:-1]: yield line.decode() # kunzip
last = splits[-1] # kunzip
yield last.decode() # kunzip
res = gen2(it) # kunzip
else: res = gen(it) # kunzip
if not single: return res # kunzip
else: return "\n".join(res) if self.text else b"".join(res) # kunzip
unzip = kunzip # kunzip