Source code for k1lib.cli.inp

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""This module for tools that will likely start the processing stream."""
from typing import Iterator, Union, Any, List
import k1lib, urllib, subprocess, warnings, os, k1lib, threading, time, warnings, math, io, dill, urllib, validators
from collections import deque
from k1lib.cli import BaseCli, init; import k1lib.cli as cli
from k1lib.cli.typehint import *
from contextlib import contextmanager
requests = k1lib.dep.requests
try: import minio; hasMinio = True
except: hasMinio = False
__all__ = ["cat", "catPickle", "splitSeek", "refineSeek", "wget", "ls", "cmd", "walk", "urlPath", "kzip", "kunzip", "unzip"]
settings = k1lib.settings.cli
class NoPartialContent(Exception): pass                                          # NoPartialContent
def getChunk(url:str, sB:int, eB:int, timeout:float, retries:int) -> bytes: # start inclusive, end exclusive # getChunk
    for i in range(retries):                                                     # getChunk
        try: res = requests.get(url, headers={"Range": f"bytes={sB}-{eB-1}"}, timeout=timeout) # getChunk
        except Exception as e:                                                   # getChunk
            if i >= retries-1: raise Exception(f"Can't get file chunk")          # getChunk
            continue                                                             # getChunk
        if res.status_code != 206: raise NoPartialContent(f"Server doesn't allow partial downloads at this particular url. Status code: {res.status_code}") # getChunk
        return res.content                                                       # getChunk
def getChunks(url:str, sB:int, eB:int, chunkSize=None, chunkTimeout:float=10, chunkRetries:int=10) -> List[bytes]: # getChunks
    """Grabs bytes from sB to eB in chunks"""                                    # getChunks
    chunkSize = chunkSize or settings.cli.cat.chunkSize                          # getChunks
    return range(sB, eB+1) | cli.batched(chunkSize, True) | cli.apply(lambda r: getChunk(url, r.start, r.stop-1, chunkTimeout, chunkRetries)) # getChunks
catSettings = k1lib.Settings().add("chunkSize", 100000, "file reading chunk size for binary+chunk mode. Decrease it to avoid wasting memory and increase it to avoid disk latency") # getChunks
catSettings.add("every", k1lib.Settings().add("text", 1000, "for text mode, will print every n lines").add("binary", 10, "for binary mode, will print every n 100000-byte blocks"), "profiler print frequency") # getChunks
settings.add("cat", catSettings, "inp.cat() settings")                           # getChunks
                                                                                 # getChunks
rfS = k1lib.Settings()                                                           # getChunks
settings.add("RemoteFile", rfS, "inp.RemoteFile() settings, used in cat(), splitSeek() and the like") # getChunks
rfS.add("memoryLimit", 100_000_000, "if the internal cache exceeds this limit (in bytes), and randomAccess is False, then old downloaded chunks will be deleted") # getChunks
rfS.add("timeout", 10, "seconds before terminating the remote request and retrying") # getChunks
rfS.add("retries", 10, "how many times to retry sending the request before giving up") # getChunks
def noPartial(url, *args):                                                       # noPartial
    try: return len(getChunk(url, 0, 10, *args)) != 10                           # noPartial
    except NoPartialContent: return True                                         # noPartial
class RemoteFile:                                                                # RemoteFile
    def __init__(self, url, randomAccess=True, blockSize=None, noPartialConfirm=False, timeout:float=None, retries:int=None): # RemoteFile
        """
:param url: url of the remote file
:param randomAccess: is random accessing parts of the file expected? If
    True, then keeps all of the reads in ram internally, else free them
    as soon as possible
:param blockSize: all reads will fetch roughly this amount of bytes"""           # RemoteFile
        self.url = url; self.randomAccess = randomAccess; self.blockSize = blockSize or settings.cat.chunkSize # RemoteFile
        self.noPartialConfirm = noPartialConfirm; self.size = None; self.domain = k1lib.Domain() # RemoteFile
        self.seekPos = 0; self.reads = deque() # List[sB, eB, content]           # RemoteFile
        self._confirmMsgShown = False; self.timeout = timeout or rfS.timeout; self.retries = retries or rfS.retries # RemoteFile
        self._totalReadSize = 0; self.noPartial = noPartial(url, self.timeout, self.retries) # RemoteFile
    def _fetch(self, sB:int, eB:int): # fetches from start to end byte and dumps to internal memory. Inclusive start and end byte # RemoteFile
        if not self.noPartial:                                                   # RemoteFile
            eB = max(eB, min(sB+self.blockSize, len(self)))                      # RemoteFile
            chunk = getChunk(self.url, sB, eB, self.timeout, self.retries)       # RemoteFile
        else:                                                                    # RemoteFile
            if self.noPartialConfirm and not self._confirmMsgShown:              # RemoteFile
                ans = input(f"""Remote file '{self.url}' don't support partial downloads.
Therefore the entire file will be loaded into RAM, which
could be undesireable. Do you want to continue? Y/n: """)                        # RemoteFile
                self._confirmMsgShown = True                                     # RemoteFile
                if ans.lower()[0] != "y": self.reads.append([0, 0, b""]); return # RemoteFile
            sB = 0; chunk = requests.get(self.url).content; eB = len(chunk)      # RemoteFile
        self.reads.append([sB, eB, chunk])                                       # RemoteFile
        self._totalReadSize += len(chunk); self.domain = self.domain + k1lib.Domain([sB, eB]) # RemoteFile
        if not self.randomAccess and self._totalReadSize > rfS.memoryLimit: # deletes old reads # RemoteFile
            sB, eB, chunk = self.reads.popleft()                                 # RemoteFile
            self._totalReadSize -= len(chunk)                                    # RemoteFile
    def _ensureRange(self, sB, eB): # makes sure that all ranges between sB and eB are available # RemoteFile
        missingDomain = k1lib.Domain([max(sB-30, 0), min(eB+30, len(self))]) & -self.domain # RemoteFile
        for sB, eB in missingDomain.ranges: self._fetch(sB, eB)                  # RemoteFile
    def _readChunks(self, sB, eB): # read from sB to eB, but in chunks, to be optimized. inclusive sB, exclusive eB # RemoteFile
        sB = max(min(sB, len(self)), 0); eB = max(min(eB, len(self)), 0); self._ensureRange(sB, eB) # RemoteFile
        return self.reads | cli.filt(~cli.aS(lambda s,e,chunk: e>=sB and s<=eB)) | cli.sort() | ~cli.apply(lambda s,e,chunk: chunk[max(sB-s, 0):len(chunk)+min(eB-e, 0)]) | cli.filt(len) # RemoteFile
    def seek(self, cookie, whence=0):                                            # RemoteFile
        if whence == 0: self.seekPos = cookie                                    # RemoteFile
        elif whence == 1: self.seekPos += cookie                                 # RemoteFile
        elif whence == 2: self.seekPos = len(self) + cookie                      # RemoteFile
        else: raise Exception("Invalid whence")                                  # RemoteFile
        return self.seekPos                                                      # RemoteFile
    def read(self, size, join=True):                                             # RemoteFile
        chunks = self._readChunks(self.seekPos, self.seekPos + size)             # RemoteFile
        self.seekPos += size; return b"".join(chunks) if join else chunks        # RemoteFile
    def readline(self, newLine=True):                                            # RemoteFile
        ans = []; seekPos = self.seekPos                                         # RemoteFile
        try:                                                                     # RemoteFile
            while self.seekPos < len(self):                                      # RemoteFile
                for chunk in self.read(self.blockSize, False):                   # RemoteFile
                    if len(chunk) == 0: raise SyntaxError()                      # RemoteFile
                    ans.append(chunk)                                            # RemoteFile
                    if b"\n" in chunk: raise SyntaxError()                       # RemoteFile
        except SyntaxError: pass                                                 # RemoteFile
        ans = b"".join(ans)                                                      # RemoteFile
        try: n = ans.index(b"\n")                                                # RemoteFile
        except ValueError: n = len(ans) # only happens at end of file            # RemoteFile
        self.seekPos = seekPos + n+1; return (ans[:n+1] if newLine else ans[:n]).decode() # RemoteFile
    def readlines(self, newLine=True):                                           # RemoteFile
        while True:                                                              # RemoteFile
            yield self.readline(newLine)                                         # RemoteFile
            if self.seekPos >= len(self): break                                  # RemoteFile
    def tell(self): return self.seekPos                                          # RemoteFile
    def _getSize(self):                                                          # RemoteFile
        if self.noPartial: self._fetch(0, 10); return self.reads[0][1]           # RemoteFile
        for i in range(self.retries):                                            # RemoteFile
            try: return requests.head(self.url, timeout=self.timeout).headers.items() | cli.apply(cli.op().lower(), 0) | cli.toDict() | cli.op()["content-length"].ab_int() # RemoteFile
            except Exception as e:                                               # RemoteFile
                if i >= self.retries: raise Exception(f"Can't get size of remote file: {e}") # RemoteFile
    def __len__(self):                                                           # RemoteFile
        if self.size is None: self.size = self._getSize()                        # RemoteFile
        return self.size                                                         # RemoteFile
    def __repr__(self): return f"<RemoteFile url={self.url} size={k1lib.fmt.size(len(self))}>" # RemoteFile
import zipfile, inspect                                                          # RemoteFile
class ZipWrapper:                                                                # ZipWrapper
    def __init__(self, a, zfn): self.a = a; self.zfn = zfn                       # ZipWrapper
    def __repr__(self):                                                          # ZipWrapper
        a = self.a; s = f" ({round(a.compress_size/a.file_size*100)}%)" if a.file_size > 0 else "" # ZipWrapper
        return f"<Zip subfile name='{a.filename}' {k1lib.fmt.size(a.file_size)} -> {k1lib.fmt.size(a.compress_size)}{s}>" # ZipWrapper
    @property                                                                    # ZipWrapper
    def size(self): return self.a.file_size                                      # ZipWrapper
    @property                                                                    # ZipWrapper
    def compressedSize(self): return self.a.compress_size                        # ZipWrapper
    def _catHandle(self):                                                        # ZipWrapper
        with zipfile.ZipFile(self.zfn) as zipf:                                  # ZipWrapper
            with zipf.open(self.a.filename) as subfile:                          # ZipWrapper
                yield subfile                                                    # ZipWrapper
@contextmanager                                                                  # ZipWrapper
def openFile(fn, text, noPartialConfirm=False): # can be actual file or url      # openFile
    if not isinstance(fn, str):                                                  # openFile
        if hasattr(fn, "_catHandle"): yield from fn._catHandle(); return # custom datatype case # openFile
        else: yield fn; return # file handle case, just return itself            # openFile
    if os.path.exists(fn):                                                       # openFile
        if text:                                                                 # openFile
            with open(fn, "r", settings.cat.chunkSize) as f: yield f             # openFile
        else:                                                                    # openFile
            with open(fn, "rb", settings.cat.chunkSize) as f: yield f            # openFile
    elif validators.url(fn) is True:                                             # openFile
        yield RemoteFile(fn, False, noPartialConfirm=noPartialConfirm)           # openFile
    else: raise FileNotFoundError(f"The file {fn} doesn't seem to exist and/or it's not a valid url") # openFile
def _catGenText(fn, sB, eB): # fn for "file name"                                # _catGenText
    try:                                                                         # _catGenText
        if sB == 0 and eB == -1: # fast path without bounds (90-160 MB/s expected) # _catGenText
            with openFile(fn, True) as f:                                        # _catGenText
                line = f.readline()                                              # _catGenText
                if isinstance(line, str): # why put the if outside the loop? Speed reasons. Also, isn't .readline() supposed to return a string? Well, cause we're supporting random file handles from custom datatypes, some asshole file handles might return bytes instead # _catGenText
                    while True:                                                  # _catGenText
                        if line == "": return                                    # _catGenText
                        yield line[:-1] if line[-1] == "\n" else line            # _catGenText
                        line = f.readline()                                      # _catGenText
                else:                                                            # _catGenText
                    while True:                                                  # _catGenText
                        line = line.decode()                                     # _catGenText
                        if line == "": return                                    # _catGenText
                        yield line[:-1] if line[-1] == "\n" else line            # _catGenText
                        line = f.readline()                                      # _catGenText
        else: # slow path with bounds (15 MB/s expected). Update: much faster now, expect only 40% slower than the path above # _catGenText
            sB = wrap(fn, sB); eB = wrap(fn, eB)                                 # _catGenText
            with openFile(fn, True) as f:                                        # _catGenText
                f.seek(sB); b = sB # current byte                                # _catGenText
                line = f.readline()                                              # _catGenText
                if isinstance(line, str):                                        # _catGenText
                    while True:                                                  # _catGenText
                        b += len(line)                                           # _catGenText
                        if len(line) == 0: return                                # _catGenText
                        if b > eB: yield line[:len(line)-(b-eB)]; return         # _catGenText
                        yield line[:-1] if line[-1] == "\n" else line            # _catGenText
                        line = f.readline()                                      # _catGenText
                else:                                                            # _catGenText
                    while True:                                                  # _catGenText
                        line = line.decode()                                     # _catGenText
                        b += len(line)                                           # _catGenText
                        if len(line) == 0: return                                # _catGenText
                        if b > eB: yield line[:len(line)-(b-eB)]; return         # _catGenText
                        yield line[:-1] if line[-1] == "\n" else line            # _catGenText
                        line = f.readline()                                      # _catGenText
    except FileNotFoundError: pass                                               # _catGenText
def _catGenBin(fn, sB, eB):                                                      # _catGenBin
    chunkSize = settings.cat.chunkSize; sB = wrap(fn, sB); eB = wrap(fn, eB); nB = eB - sB # number of bytes to read total # _catGenBin
    with openFile(fn, False) as f:                                               # _catGenBin
        f.seek(sB); nChunks = math.ceil(nB / chunkSize); lastChunkSize = nB - chunkSize*(nChunks-1) # _catGenBin
        # had to do this because RemoteFile is actually not thread-safe          # _catGenBin
        # applyF = (lambda f_: cli.apply(f_)) if isinstance(f, RemoteFile) else (lambda f_: cli.applyTh(f_, prefetch=10)) # _catGenBin
        applyF = (lambda f_: cli.apply(f_)) # actually, normal reads are not thread-safe either. Stupid me # _catGenBin
        yield from range(nChunks) | applyF(lambda i: f.read(chunkSize) if i < nChunks-1 else f.read(chunkSize)[:lastChunkSize]) # _catGenBin
def fileLength(fn):                                                              # fileLength
    with openFile(fn, False) as f: return f.seek(0, os.SEEK_END)                 # fileLength
def wrap(fn, b): return b if b >= 0 else b + fileLength(fn) + 1                  # wrap
class _cat(BaseCli):                                                             # _cat
    def __init__(self, text, chunks, sB, eB):                                    # _cat
        super().__init__(capture=True)                                           # _cat
        self.text = text; self.chunks = chunks; self.sB = sB; self.eB = eB       # _cat
    def _typehint(self, ignored=None):                                           # _cat
        if self.text: return tIter(str) if self.chunks else tList(str)           # _cat
        else: return tIter(bytes) if self.chunks else bytes                      # _cat
    def __ror__(self, fn:Union[str, "fileHandle"]) -> Union[Iterator[str], bytes]: # _cat
        ser = self.capturedSerial; text = self.text; chunks = self.chunks; sB = self.sB; eB = self.eB # _cat
        if not isinstance(fn, str) and hasattr(fn, "_cat"): # custom datatype, _cat method defined, so it will take control of things # _cat
            kwargs = {"text": text, "chunks": chunks, "sB": sB, "eB": eB}        # _cat
            kwdiff = [a for a, b in [["text",text!=True], ["chunks",chunks!=True if text else chunks!=False], ["sB",sB!=0], ["eB",eB!=-1]] if b] # _cat
            args = inspect.getfullargspec(fn._cat).args[1:]; n = len(args)       # _cat
            s = set(["ser", "kwargs"]); weirdArgs = [a for a in args if a not in s] # _cat
            if len(weirdArgs) > 0: raise Exception(f"Custom datatype `{type(fn)}` has ._cat() method, which expects only `ser` and `kwargs` arguments, but detected these arguments instead: {weirdArgs}. Please fix `{type(fn)}`") # _cat
            def guard():                                                         # _cat
                if kwdiff: raise Exception(f"Custom datatype `{type(fn)}` does not support custom cat() arguments like {kwdiff}") # _cat
            if n == 0: guard(); return fn._cat() | ser                           # _cat
            elif n == 1:                                                         # _cat
                if args[0] == "ser": guard(); return fn._cat(ser)                # _cat
                if args[0] == "kwargs": return fn._cat(kwargs) | ser             # _cat
            elif n == 2:                                                         # _cat
                if args[0] == "ser": return fn._cat(ser, kwargs)                 # _cat
                else: return fn._cat(kwargs, ser)                                # _cat
            else: raise Exception("Unreachable")                                 # _cat
        fn = os.path.expanduser(fn) if isinstance(fn, str) else fn               # _cat
        if text and chunks and k1lib._settings.packages.k1a and isinstance(fn, str) and os.path.exists(fn): # _cat
            return k1lib._k1a.k1a.StrIterCat(fn, sB, eB) | ser # accelerated C version # _cat
        if chunks: return (_catGenText(fn, sB, eB) | ser) if text else (_catGenBin(fn, sB, eB) | ser) # _cat
        sB = wrap(fn, sB); eB = wrap(fn, eB)                                     # _cat
        if text:                                                                 # _cat
            with openFile(fn, True) as f: f.seek(sB); return f.read(eB-sB).splitlines() | ser # _cat
        else:                                                                    # _cat
            with openFile(fn, False) as f: f.seek(sB); return f.read(eB-sB) | ser # _cat
    def _jsF(self, meta):                                                        # _cat
        """Only supports get requests and JS objects that has async ._cat() function""" # _cat
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto()                        # _cat
        header, _fIdx, _async = k1lib.kast.asyncGuard(self.capturedSerial._jsF(meta)) # _cat
        return f"""\
{header}
{fIdx} = async ({dataIdx}) => {{
    if (typeof({dataIdx}) === "string") {{
        const res = await fetch({dataIdx}, {{ method: "GET" }})
        if (res.ok) return {'await ' if _async else ''}{_fIdx}(await res.text());
        throw new Error(`Can't fetch ${{{dataIdx}}}: ${{res.status}} - ${{res.statusText}}`)
    }}
    return {'await ' if _async else ''}{_fIdx}(await {dataIdx}._cat());
}}""", fIdx                                                                      # _cat
        pass                                                                     # _cat
class Profile(BaseCli):                                                          # Profile
    def __init__(self, text): self.data = []; self.text = text                   # Profile
    def __ror__(self, it):                                                       # Profile
        fmt = k1lib.fmt; chars = 0; beginTime = time.time()                      # Profile
        if self.text:                                                            # Profile
            a, b, c, d, f = k1lib.ConstantPad.multi(5); every = settings.cat.every.text # Profile
            for lines, e in enumerate(it):                                       # Profile
                chars += len(e)                                                  # Profile
                if lines % every == 0: # every 1000 lines, print stuff out       # Profile
                    elapsed = time.time() - beginTime#; self.data.append([lines, chars, elapsed]) # Profile
                    print(f"Current line: {fmt.item(lines) | a} ({fmt.item(lines/elapsed) | b} lines/s), current byte/chars: {fmt.size(chars) | c} ({fmt.size(chars/elapsed) | d}/s), elapsed: {fmt.time(elapsed) | f}                                 ", end="\r") # Profile
                yield e                                                          # Profile
        else:                                                                    # Profile
            a, b, c = k1lib.ConstantPad.multi(3); every = settings.cat.every.binary # Profile
            for i, e in enumerate(it):                                           # Profile
                chars += len(e)                                                  # Profile
                if i % every == 0: # every 10 100000-byte chunks, print stuff out # Profile
                    elapsed = time.time() - beginTime#; self.data.append([chars, elapsed]) # Profile
                    print(f"Current size/chars: {fmt.size(chars) | a} ({fmt.size(chars/elapsed) | b}/s), elapsed: {fmt.time(elapsed) | c}                                 ", end="\r") # Profile
                yield e                                                          # Profile
[docs]def cat(fileName:str=None, text:bool=True, chunks:bool=None, profile:bool=False, sB=0, eB=-1): # cat """Reads a file line by line. Example:: # display first 10 lines of file cat("file.txt") | headOut() # piping in also works "file.txt" | cat() | headOut() # read bytes from an image file and dumps it to another file cat("img.png", False) | file("img2.png") If you want to read only specific sections of the file, you can specify the start (``sB``) and end byte (``eB``) like this:: "123456\\n89" | file("test/catTest.pth") # returns ['3456', '8'] cat("test/catTest.pth", sB=2, eB=8) | deref() settings.cat.context.chunkSize=3 # just for demonstration, don't do it normally # returns [b'123', b'456', b'\\n8'] cat("test/catTest.pth", text=False, chunks=True, eB=8) | deref() .. admonition:: Remote files You can also read from urls directly, like this:: cat("https://k1lib.com/latest/") | deref() For remote files like this, there are extra settings at :data:`~k1lib.settings`.cli.RemoteFile. This will also read the file chunk by chunk if required. If the website doesn't support partial downloads, then all of it will be downloaded and stored into ram, which may not be desireable. The available settings are: - timeout: seconds before killing the existing request - retries: try to resend the request for this much before giving up If you are working with large files and would like to read 1 file from multiple threads/processes, then you can use this cli in conjunction with :class:`splitSeek`. If you are dumping multiple pickled objects into a single file, you can read all of them using :meth:`cat.pickle`, which uses :class:`catPickle` underneath. This cli has lots of settings at :data:`~k1lib.settings`.cli.cat See also: :meth:`ls` .. admonition:: Custom datatype It is possible to build objects that can interoperate with this cli, like this:: class custom1: def __init__(self, config=None): ... def _cat(self): return ["abc", "def"] custom1() | cat() # returns ["abc", "def"] custom1() | cat() | item() # returns "abc" custom1() | (cat() | item()) # returns "abc" When called upon, :meth:`cat` will see that the input is not a simple string, which will prompt it to look for ``_cat()`` method of the complex object and execute it. By default, if the user specifies any non-default arguments like ``text=False``, it will errors out because :meth:`cat` does not know how to handle it. Here's how to do it right:: class custom2: def __init__(self, config=None): ... def _cat(self, kwargs): # default kwargs if user doesn't specify anything else is `{"text": True, "chunks": None, "sB": 0, "eB": -1}` if kwargs["text"]: return ["abc", "def"] else: return [b"abc", b"def"] custom2() | cat() # returns ["abc", "def"] custom2() | cat() | item() # returns "abc" custom2() | (cat() | item()) # returns "abc" custom2() | cat(text=False) # returns [b"abc", b"def"] custom2() | cat(text=False) | item() # returns b"abc" Here, you're saying that your function can handle non-standard arguments, so :meth:`cat` will give you all the args. You may support only some arguments and completely ignore others, it's all up to you. Might still be worth it to throw some errors warning users of arguments your custom datatype does not support. You can also capture future clis like this:: class custom3: def __init__(self, config=None): ... def _cat(self, ser): # "ser" stands for "serial" if len(ser.clis) == 1 and isinstance(ser.clis[0], item): return "123" # fancy optimization return ["abc", "def"] | ser # default "slow" base case custom3() | cat() # returns ["abc", "def"] custom3() | cat() | item() # returns "abc", because cat() did not capture any clis custom3() | (cat() | item()) # returns "123", which might be desireable, up to you This feature is pretty advanced actually, in that you can actually do different things based on future processing tasks. Let's say that the captured cli looks like ``cut(3) | batched(10) | rItem(3)``. This essentially means "give me elements 30 through 39 from the 4th column". With this information, you can query your custom database for exactly those elements only, while fetching nothing else, which would be great for performance. You can also outright lie to the user like in the example, where if :class:`~k1lib.cli.utils.item` is detected, it will return a completely different version ("123") while it should return "abc" instead. The possibilities are endless. As you can probably feel, it's super hard to actually utilize this in the right way without breaking something, as you are completely responsible for the captured clis. Let's see another example:: class custom4: def __init__(self, config=None): ... def _cat(self, ser): return ["abc", "def"] custom4() | cat() # returns ["abc", "def"] custom4() | cat() | item() # returns "abc" custom4() | (cat() | item()) # returns ["abc", "def"] Same story as ``custom3``, demonstrating that if you declare that you want to see and manipulate ``ser``, you'll be completely responsible for it, and if you don't handle it correctly, it will create horrible bugs. You can, of course, have access to ``ser`` and ``kwargs`` at the same time:: class custom5: def __init__(self, config=None): ... def _cat(self, ser, kwargs): return ["abc", "def"] | ser If your custom datatype feels like a block device, and you don't want to rewrite all the functionalities in :meth:`cat`, you can implement the method ``_catHandle`` that yields the custom file handle instead:: class custom6: def __init__(self, config=None): ... def _catHandle(self): yield io.BytesIO(b"abc\\ndef\\n123") class custom7: def __init__(self, config=None): ... def _catHandle(self): yield io.StringIO("abc\\ndef\\n123") custom6() | cat(text=False) # returns b'abc\\ndef\\n123' custom7() | cat() | deref() # returns ['abc', 'def', '123'] Remember that you have to yield instead of returning the file handle. This is so that you can use ``with`` statements, and if you return the file handle, the file might be closed by the time :meth:`cat` decides to use it. :param fileName: if None, then return a :class:`~k1lib.cli.init.BaseCli` that accepts a file name and outputs Iterator[str] :param text: if True, read text file, else read binary file :param chunks: if True then reads the file chunk by chunk, else reads the entire file. Defaults to True in text mode and False in binary mode :param profile: whether to profile the file reading rate or not. Can adjust printing frequency using `settings.cli.cat.every` :param sB: "start byte". Specify this if you want to start reading from this byte :param eB: "end byte", exclusive. Default -1 means end of file""" # cat if chunks is None: chunks = True if text else False # dev note: if default arguments are changed, please also change _cat()'s implementation for custom datatypes # cat if profile and not chunks: warnings.warn(f"Can't profile reading rate when you're trying to read everything at once"); profile = False # cat f = _cat(text, chunks, sB, eB) # cat if profile: f = f | Profile(text) # cat return f if fileName is None else fileName | f # cat
k1lib.settings.cli.cat.add("pickle", k1lib.Settings(), "inp.cat.pickle() settings") # cat
[docs]class catPickle(BaseCli): # catPickle
[docs] def __init__(self, pickleModule=None): # catPickle """Reads a file as a series of pickled objects. Example:: "ab" | aS(dill.dumps) | file("test/catTest.pth") "cd" | aS(dill.dumps) >> file("test/catTest.pth") # append to the file # returns ["ab", "cd"] cat.pickle("test/catTest.pth") | deref() # also returns ["ab", "cd"], same style as :class:`cat` "test/catTest.pth" | cat.pickle() | deref() The function ``cat.pickle`` internally uses this class, so don't worry about the discrepancy in the examples. So this kinda relies on a trick that you can continuously dump pickled objects to a single file and it would still work as usual:: [1, 2, 3] | aS(dill.dumps) | file("b.pth") [4, 5] | aS(dill.dumps) >> file("b.pth") with open("b.pth", "rb") as f: objs = [dill.load(f), dill.load(f)] assert objs == [[1, 2, 3], [4, 5]] .. admonition:: tail() optimization If you regularly append pickled objects to files, say you're trying to save complex time series data, and you want to grab the most recent data, you might do it like ``fn | cat.pickle() | tail(10)`` to get the last 10 objects. But normally, this means that you have to read through the entire file in order to just read the last bit of it. You can't exactly start close to the end as you don't know where the object boundaries are. Luckily, I got fed up with this so much that now when you do this: ``fn | (cat.pickle() | tail(10))`` (note the parenthesis!), it will only read the end of the file! This optimization is mostly airtight, but not entirely. How it works is that cat.pickle() tries to unpickle the first object in the file and grab the object's size in bytes. This info is used to make guesses as to where it should start reading the file from, which is ``endByte - tail.n * object_size * 2`` Also, this optimization has only been tested on :mod:`dill` and :mod:`pickle`. :mod:`cloudpickle` has been tested to not work (magic numbers are a little different). I can potentially add in a mechanism for you to specify magic numbers for any pickler you want, but it feels niche, so I'm going to skip that for now. :param pickleModule: pickle module to use. Python's default is "pickle", but I usually use :mod:`dill` because it's more robust""" # catPickle super().__init__(capture=True); self.pm = pickleModule or dill # catPickle
[docs] def __ror__(self, fn): # catPickle pm = self.pm # catPickle def g(sB=0): # catPickle with open(os.path.expanduser(fn), "rb") as f: # catPickle f.seek(sB) # catPickle try: # catPickle while True: yield pm.load(f) # catPickle except EOFError: pass # catPickle if len(self.capturedClis) >= 1: # catPickle c0 = self.capturedClis[0]; s = os.path.getsize(fn) # catPickle if isinstance(c0, cli.tail) and s > 1e4 and isinstance(c0.n, int) and c0.n > 0: # if less than 10kB then it's not worth optimizing! # catPickle # figures out the size of the first element to guage the relative steps we need to take! # catPickle factor = 1.5 # catPickle while True: # catPickle with open(os.path.expanduser(fn), "rb") as f: # catPickle s1 = len(pm.dumps(pm.load(f)))*factor # catPickle arr = list(g((fn | cli.splitSeek(c=b"\x80\x04\x95", ws=[s-c0.n*s1, c0.n*s1]))[1] - 1)) # catPickle if c0.n < len(arr): return arr | self.capturedSerial # then execute the rest of the pipeline # catPickle factor *= 1.5 # if guess too conservatively, then try again with a bigger factor! # catPickle return g() | self.capturedSerial # catPickle
def _catPickle(fileName=None, pickleModule=dill): # _catPickle """Reads a file as a series of pickled objects. See :class:`catPickle`""" # _catPickle return catPickle(pickleModule) if fileName is None else fileName | catPickle(pickleModule) # _catPickle cat.pickle = _catPickle # _catPickle class splitSeekRes: # splitSeekRes def __init__(self, fn, res): # splitSeekRes """Return result of splitSeek(). We don't want to return just a list alone, because we also want to pass along information like the file name""" # splitSeekRes self.fn = fn; self.res = res # splitSeekRes def __getitem__(self, idx): return self.res[idx] # splitSeekRes def __len__(self): return len(self.res) # splitSeekRes def __iter__(self): return iter(self.res) # splitSeekRes def __repr__(self): return self.res.__repr__() # splitSeekRes
[docs]class splitSeek(BaseCli): # splitSeek
[docs] def __init__(self, n=None, c=b'\n', ws=None): # splitSeek """Splits a file up into n fragments aligned to the closest character and return the seek points. Example:: # preparing the large file range(120) | apply(lambda x: f"{x:3}_56789") | file("test/largeFile.txt") # returns [0, 30, 70, 110, 150, 190, 230, 270, 300, 340] "test/largeFile.txt" | splitSeek(31) | head() # returns 32 "test/largeFile.txt" | splitSeek(31) | shape(0) # returns 32, demonstrating you can also pipe file objects in, if that's what you want open("test/largeFile.txt") | splitSeek(31) | shape(0) # returns [0, 0, 10, 10, 20, 30, 30, 40, 40, 50], notice some segments have zero length "test/largeFile.txt" | splitSeek(200) | head() # returns [0, 400, 1200], demonstrating that you can split a file up unevenly by weights "test/largeFile.txt" | splitSeek(ws=[1, 2]) | deref() So, the generated file has 120 lines in total. Each line is 10 bytes (9 for the string, and 1 for the new line character). Splitting the file into 31 fragments will result in 32 seek points (:math:`p_i\quad i\in[1, n+1]`). Also notice how the returned seek position is not the position of the character themselves, but the character right after. So if we're splitting by the new line character, then it will return the next character after the new line. This is by default because the majority of the time, I just want it to split by lines. But you might have other ideas on how to use this in mind. Then just subtract 1 from all returned seek points. You can then use these seek points to read the file in multiple threads/processes using :meth:`cat`, like this:: # returns [[' 0_56789', ' 1_56789', ' 2_56789'], [' 3_56789', ' 4_56789', ' 5_56789', ' 6_56789']] "test/largeFile.txt" | splitSeek(31) | window(2) | ~apply(lambda sB, eB: cat("test/largeFile.txt", sB=sB, eB=eB-1)) | head(2) | deref() Because :math:`120/31\\approx4`, most of cat's reads contain 4 lines, but some has 3 lines. Also notice that the lines smoothly transitions between cat's reads (``2_56789`` to ``3_56789``), so that's pretty nice. Just like with :meth:`cat`, this also works with urls:: "https://example.com" | splitSeek(10) .. warning:: You have to really test whether reading the same file from multiple processes is going to be really faster or not. If your data is stored in a HDD (aka hard drive, with spinning disks), then it will actually slow you down (10x-100x), because the disk will have to context switch all the time, and each switch has a 10ms cost. You also have to take into account collecting together the results of all processes, which can bottleneck the cpu. Read more about concurrency pitfalls at :class:`~k1lib.cli.modifier.applyMp`. In some scenarios where you want to adjust the seek points even more, like when you want to parse FASTA genome files, which has blocks of 2/4 lines each like this: .. code-block:: text @FP200005993L1C001R00100000061/2 TTTTAAACTTGCATTCTTTGGAGATTTGCTGAGTGTTGCTAGAGCTGGGAAACTTTTTTAATGAGATACGTGCATATTTTTCAAATTTACAGATCTTTTTTCACAAAAATAGAAAGTCATAAATGTGAAATGGAAACCTAAACAAGGCAA + GFEEEDEFGFFFFEFFFFFIFCEEEFFFGFFDFEGEDGFFFGDGFFGDGCE@GGGEEFDFGFGCFDFGGHCHFFFGFFFFFGEFDFFGHGFGEEHGFGEGFGFHFFEGFFFE;GEGEFGGHFFEI=GDAEDIFDDFGHFGEFGFEGGGGF @FP200005993L1C001R00100000167/2 CTGGAATTTGGTATCTTATTGCCAAAGAATCTGTTTTGTGAAACTTGGGATCTCTATTTTAATGTTAATTCTGGTCAGTTGTGCCTAAACTCCATAAAGCAGGGACTATACTGAGGCGTATTCAATCTTCCTTCTTACCAAGGCCAGGAA + EEFECEDEFFCGFFFFFEEEGEGFEDECCEFEFDFEEFDFEDDFEFEEFDDFFEEFFEEFEFFHEEFEEFEFFDEFFFECF>FFFEFEDFCFFFEGFEDEEGDDFEEFEFGEEBD@EG>EEFFECEEGFEEEFFEDGEEEDE5EBDG:CC Here, each 4 lines are (title, read, blank, quality). Because by default, this will only split neatly along new lines, you will have to write extra functions to detect if a particular seek point is desirable, and if not, either jump forward or backward using :meth:`splitSeek.forward` and :meth:`splitSeek.backward`. To help with this, :class:`refineSeek` has some useful methods that you might want to check out. :param n: how many splits do you want? :param c: block-boundary character, usually just the new line character :param ws: weights. If given, the splits length ratios will roughly correspond to this""" # splitSeek self.n = n; self.c = c; self.ws = ws # splitSeek if ws is None and n is None: raise Exception("Specify at least n or ws for splitSeek to work") # splitSeek
[docs] @staticmethod # splitSeek def forward(f, i:int, c=b'\n') -> int: # splitSeek """Returns char location after the search char, going forward. Example:: f = io.BytesIO(b"123\\n456\\n789\\nabc") f | splitSeek(2) # returns [0, 4, 15] splitSeek.forward(f, 2) # returns 4 splitSeek.forward(f, 3) # returns 4 splitSeek.forward(f, 4) # returns 8 :param f: file handle :param i: current seek point :param c: block-boundary character""" # splitSeek def inner(f): # splitSeek f.seek(i) # splitSeek while True: # splitSeek b = f.tell(); s = f.read(1000); di = s.find(c) # splitSeek if di > -1: return b + di + 1 # splitSeek if s == "": return -1 # splitSeek if isinstance(f, str): # splitSeek with openFile(os.path.expanduser(f), False) as _f: return inner(_f) # splitSeek else: return inner(f) # splitSeek
[docs] @staticmethod # splitSeek def backward(f, i:int, c=b'\n') -> int: # splitSeek """Returns char location after the search char, going backward. Example:: f = io.BytesIO(b"123\\n456\\n789\\nabc") f | splitSeek(2) # returns [0, 4, 15] splitSeek.backward(f, 5) # returns 4 splitSeek.backward(f, 4) # returns 4 splitSeek.backward(f, 3) # returns 0 :param f: file handle :param i: current seek point :param c: block-boundary character""" # splitSeek def inner(f): # splitSeek mul = 1 # splitSeek while True: # splitSeek begin = max(i-1000*mul, 0); end = max(i-1000*(mul-1), 0); mul += 1 # search range # splitSeek f.seek(begin); b = f.tell(); s = f.read(end-begin); di = s.rfind(c) # splitSeek if di > -1: return b + di + 1 # splitSeek if b == 0: return 0 # splitSeek if isinstance(f, str): # splitSeek with openFile(os.path.expanduser(f), False) as _f: return inner(_f) # splitSeek else: return inner(f) # splitSeek
[docs] def __ror__(self, fn): # why return splitSeekRes instead of the seek positions as a list directly? Because we want to pass along dependencies like file name to downstream processes like refineSeek # splitSeek if isinstance(fn, str): fn = os.path.expanduser(fn) # splitSeek n = self.n; c = self.c; ws = self.ws # splitSeek def func(f): # f is a file-like object # splitSeek f.seek(0, os.SEEK_END); end = f.tell() # splitSeek if ws is None: begins = range(n) | cli.apply(lambda x: int(x*end/n)) # splitSeek else: begins = range(end) | cli.splitW(*ws) | cli.apply(lambda x: x.start) # splitSeek return [*begins | cli.apply(lambda x: splitSeek.backward(f, x, c)), end] # splitSeek if isinstance(fn, str): # splitSeek with openFile(os.path.expanduser(fn), False, True) as f: return splitSeekRes(fn, func(f)) # splitSeek else: return splitSeekRes(fn, func(fn)) # splitSeek
[docs]class refineSeek(BaseCli): # refineSeek
[docs] def __init__(self, f=None, window=1): # refineSeek """Refines seek positions. Example:: # returns list of integers for seek positions "abc.txt" | splitSeek(30) # returns refined seek positions, such that the line starting at the seek positions starts with "@" "abc.txt" | splitSeek(30) | refineSeek(lambda x: x.startswith(b"@")) # same thing as above "abc.txt" | splitSeek(30) | refineSeek(lambda x: x[0] == b"@"[0]) # returns refined seek positions, such that 0th line starts with "@" and 2nd line starts with "+". This demonstrates `window` parameter "abc.txt" | splitSeek(30) | refineSeek(lambda x: x[0][0] == b"@"[0] and x[2][0] == b"+"[0], 3) # same thing as above, demonstrating some builtin refine seek functions "abc.txt" | splitSeek(30) | refineSeek.fastq() :param f: function that returns True if the current line/lines is a valid block boundary :param window: by default (1), will fetch 1 line and check boundary using ``f(line)``. If a value greater than 1 is passed (for example, 3), will fetch 3 lines and check boundary using ``f([line1, line2, line3])`` """ # refineSeek if f is None: f = lambda x: True # refineSeek self.f = cli.fastF(f); self.window = window; self.fn = None # refineSeek
[docs] def __ror__(self, seeks): # refineSeek f = self.f; window = self.window # refineSeek def read(fio, sB:int): # refineSeek fio.seek(sB) # refineSeek if window == 1: return fio.readline() # refineSeek return list(cli.repeatF(lambda: fio.readline(), window)) # refineSeek if len(seeks) <= 2: return seeks # refineSeek fn = self.fn or seeks.fn; newSeeks = [seeks[0]] # refineSeek def process(fio): # refineSeek with openFile(fn, False) as fio: # refineSeek for seek in seeks[1:-1]: # refineSeek line = read(fio, seek) # refineSeek while not f(line): seek = splitSeek.forward(fio, seek); line = read(fio, seek) # refineSeek newSeeks.append(seek) # refineSeek newSeeks.append(seeks[-1]); return newSeeks # refineSeek if isinstance(fn, str): # refineSeek with openFile(fn, False) as fio: return process(fio) # refineSeek else: return process(fn) # refineSeek
[docs] def injectFn(self, fn): # refineSeek """Injects file name dependency, if this is not used right after :class:`splitSeek`""" # refineSeek self.fn = fn; return self # refineSeek
[docs] @classmethod # refineSeek def fastq(cls): # refineSeek """Refine fastq file's seek points""" # refineSeek return cls(lambda x: x[0][0] == b"@"[0] and x[2][0] == b"+"[0], 3) # refineSeek
[docs]def wget(url:str, fileName:str=None, mkdir=True): # wget """Downloads a file. Also returns the file name, in case you want to pipe it to something else. :param url: The url of the file :param fileName: if None, then tries to infer it from the url :param mkdir: whether to make the directory leading up to the file or not""" # wget if fileName is None: fileName = url.split("/")[-1] # wget fileName = os.path.expanduser(fileName); dirname = os.path.dirname(fileName) # wget if mkdir: os.makedirs(dirname, exist_ok=True) # wget try: urllib.request.urlretrieve(url, fileName); return fileName # wget except: return None # wget
[docs]def ls(folder:str=None): # ls """List every file and folder inside the specified folder. Example:: # returns List[str] ls("/home") # same as above "/home" | ls() # only outputs files, not folders ls("/home") | filt(os.path.isfile) This can handle things that are not plain folders. For example, it can handle zip file whereby it will list out all the files contained within a particular .zip:: ls("abc.zip") Then, you can use :meth:`cat` as usual, like this:: ls("abc.zip") | item() | cat() Pretty nice! .. admonition:: Custom datatype It is possible to build objects that can interoperate with this cli, like this:: class sql: def __init__(self): ... def _ls(self): return ["something", "here"] ls() will identify that what's inputted is not a string, and will try to execute the object's "_ls()" method, so you can just simply implement it in your classes """ # ls if folder is None: return _ls() # ls else: return folder | _ls() # ls
class _ls(BaseCli): # _ls def _typehint(self, ignored=None): return tList(str) # _ls def __ror__(self, path:str): # _ls if isinstance(path, str): # _ls path = os.path.expanduser(path.rstrip(os.sep)) # _ls if os.path.exists(path): # _ls if os.path.isfile(path): # _ls if cat(path, False, eB=2) == b"PK": # list subfiles in a zip file # _ls return [ZipWrapper(e, path) for e in zipfile.ZipFile(path).infolist()] # _ls else: raise Exception(f"{path} is a file, not a folder, so can't list child directories") # _ls else: return [f"{path}{os.sep}{e}" for e in os.listdir(path)] # _ls else: return [] # _ls else: return path._ls() # _ls k1lib.settings.cli.add("quiet", False, "whether to mute extra outputs from clis or not") # _ls newline = b'\n'[0] # _ls class lazySt: # lazySt def __init__(self, st, text:bool): # lazySt """Converts byte stream into lazy text/byte stream, with nice __repr__.""" # lazySt self.st = st; self.text = text; # lazySt def __iter__(self): # lazySt if self.text: # lazySt while True: # lazySt line = self.st.readline() # lazySt if len(line) == 0: break # lazySt yield line.decode().rstrip("\n") # lazySt else: # lazySt while True: # lazySt line = self.st.readline() # lazySt if len(line) == 0: break # lazySt yield line # lazySt def __repr__(self): self | cli.stdout(); return "" # lazySt def executeCmd(cmd:str, inp:bytes, text): # executeCmd """Runs a command, and returns stdout and stderr streams""" # executeCmd p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=k1lib.settings.wd) # executeCmd if inp is not None: # executeCmd if isinstance(inp, (str, bytes)): p.stdin.write(inp if isinstance(inp, bytes) else inp.encode()) # executeCmd else: # executeCmd for e in inp: # executeCmd if not isinstance(e, (str, bytes)): e = str(e) # executeCmd if not isinstance(e, bytes): e = e.encode() # executeCmd p.stdin.write(e); p.stdin.write(b"\n") # executeCmd p.stdin.close(); return p, lazySt(p.stdout, text), lazySt(p.stderr, text) # executeCmd def printStderr(err): # printStderr if not k1lib.settings.cli.quiet: # printStderr e, it = err | cli.peek() # printStderr if it != []: it | cli.insert("\nError encountered:\n") | cli.apply(k1lib.fmt.txt.red) | cli.stdout() # printStderr def requireCli(cliTool:str): # requireCli """Searches for a particular cli tool (eg. "ls"), throws ImportError if not found, else do nothing""" # requireCli a = cmd(cliTool); None | a; # requireCli if len(a.err) > 0: raise ImportError(f"""Can't find cli tool {cliTool}. Please install it first.""") # requireCli
[docs]class cmd(BaseCli): # cmd _forked_num_mode3 = 0 # cmd
[docs] def __init__(self, cmd:str, mode:int=1, text=True, block=False): # 0: return (stdout, stderr). 1: return stdout, 2: return stderr # cmd """Runs a command, and returns the output line by line. Can pipe in some inputs. If no inputs then have to pipe in :data:`None`. Example:: # return detailed list of files None | cmd("ls -la") # return list of files that ends with "ipynb" None | cmd("ls -la") | cmd('grep ipynb$') It might be tiresome to pipe in :data:`None` all the time. So, you can use ">" operator to yield values right away:: # prints out first 10 lines of list of files cmd("ls -la") > headOut() If you're using Jupyter notebook/lab, then if you were to display a :class:`cmd` object, it will print out the outputs. So, a single command ``cmd("mkdir")`` displayed at the end of a cell is enough to trigger creating the directory. Reminder that ">" operator in here sort of has a different meaning to that of :class:`~k1lib.cli.init.BaseCli`. So you kinda have to becareful about this:: # returns a serial cli, cmd not executed cmd("ls -la") | deref() # executes cmd with no input stream and pipes output to deref cmd("ls -la") > deref() # returns a serial cli cmd("ls -la") > grep("txt") > headOut() # executes pipeline cmd("ls -la") > grep("txt") | headOut() General advice is, right ater a :class:`cmd`, use ">", and use "|" everywhere else. Let's see a few more exotic examples. File ``a.sh``: .. code-block:: bash #!/bin/bash echo 1; sleep 0.5 echo This message goes to stderr >&2 echo 2; sleep 0.5 echo $(</dev/stdin) sleep 0.5; echo 3 Examples:: # returns [b'1\\n', b'2\\n', b'45\\n', b'3\\n'] and prints out the error message "45" | cmd("./a.sh", text=False) | deref() # returns [b'This message goes to stderr\\n'] "45" | cmd("./a.sh", mode=2, text=False) | deref() # returns [[b'1\\n', b'2\\n', b'45\\n', b'3\\n'], [b'This message goes to stderr\\n']] "45" | cmd("./a.sh", mode=0, text=False) | deref() Performance-wise, stdout and stderr will yield values right away as soon as the process outputs it, so you get real time feedback. However, this will convert the entire input into a :class:`bytes` object, and not feed it bit by bit lazily, so if you have a humongous input, it might slow you down a little. Also, because stdout and stderr yield values right away, it means that if you want the operation to be blocking until finished, you have to consume the output:: None | cmd("mkdir abc") # might fail, because this might get executed before the previous line None | cmd("echo a>abc/rg.txt") None | cmd("mkdir abc") | ignore() # will succeed, because this will be guaranteed to execute after the previous line None | cmd("echo a>abc/rg.txt") Settings: - cli.quiet: if True, won't display errors in mode 1 :param mode: if 0, returns ``(stdout, stderr)``. If 1, returns ``stdout`` and prints ``stderr`` if there are any errors. If 2, returns ``stderr``. If 3, daemonize the process, returns nothing :param text: whether to decode the outputs into :class:`str` or return raw :class:`bytes` :param block: whether to wait for the task to finish before returning to Python or not""" # cmd super().__init__(); self.cmd = cmd; self.mode = mode # cmd self.text = text; self.block = block; self.ro = k1lib.RunOnce() # cmd
def _typehint(self, ignored=None): # cmd t = tIter(str) if self.text else tIter(bytes) # cmd if self.mode == 0: return tCollection(t, t) # cmd return t # cmd
[docs] def __ror__(self, it:Union[None, str, bytes, Iterator[Any]]) -> Iterator[Union[str, bytes]]: # cmd """Pipes in lines of input, or if there's nothing to pass, then pass None""" # cmd if self.mode == 3: # this need so much time to get right, damn # cmd if it != None: raise Exception("cmd() has .mode = 3, which means it's executed as daemon, so it should not take in any input from stdin. Do `None | cmd(..., mode=3)` instead") # cmd cwd = k1lib.settings.wd # cmd if os.fork() == 0: # cmd with k1lib.ignoreWarnings(): subprocess.Popen(f"{self.cmd} >/dev/null 2>&1", shell=True, cwd=cwd, close_fds=True, preexec_fn=os.setpgrp).wait()#; subprocess.Popen(f'disown {p.pid}', shell=True) # cmd else: cmd._forked_num_mode3 += 1; return None # cmd else: # cmd if not self.ro.done(): mode = self.mode; self.p, self.out, self.err = executeCmd(self.cmd, it, self.text) # cmd if self.block: self.out = self.out | cli.deref(); self.err = self.err | cli.deref() # cmd if mode == 0: return (self.out, self.err) # cmd elif mode == 1: threading.Thread(target=lambda: printStderr(self.err)).start(); return self.out # cmd elif mode == 2: return self.err # cmd
def __gt__(self, it): return None | self | it # cmd def __repr__(self): return (None | self).__repr__() # cmd
[docs]class walk(BaseCli): # walk
[docs] def __init__(self, **kwargs): # walk """Recursively get all files inside a dictionary. Example:: # prints out first 10 files "." | walk() | headOut()""" # walk self.kwargs = kwargs # walk
[docs] def __ror__(self, path): # walk return os.walk(path, **self.kwargs) | ~cli.apply(lambda x, y, z: z | cli.apply(lambda e: x + os.sep + e)) | cli.joinStreams() # walk
[docs]def urlPath(base:str, host:bool=True): # urlPath """Translates from a url to a file path. Example:: base = "~/ssd/some/other/path" url = "http://example.com/some/path/to/file.txt" url | urlPath(base) # returns "~/ssd/some/other/path/example_com/some/path/to/file.txt" url | urlPath(base, False) # returns "~/ssd/some/other/path/some/path/to/file.txt" :param base: base directory you want the files to be in""" # urlPath base = base.rstrip("/\\") # urlPath def inner(url): # urlPath p = urllib.parse.urlparse(url) # urlPath a = (p.netloc.replace(".", "_") + os.sep) if host else "" # urlPath return f"{base}{os.sep}{a}{p.path.strip('/')}".replace("//", "/") # urlPath return cli.aS(inner) # urlPath
import bz2, gzip, zlib # urlPath
[docs]class kzip(BaseCli): # kzip
[docs] def __init__(self, fmt="gz"): # kzip """Incrementally compresses a stream of data using gzip or bzip2. Example:: data = range(100) | apply(lambda x: str(x).encode()) | deref() # list of bytes data | kzip() | deref() # returns list of bytes range(100) | apply(str) | kzip() | deref() # returns list of bytes, demonstrating piping iterator of string works Quick reminder that if you pass in bytes iterator then it will be compressed as-is. But if you pass in string iterator then it will append a new line character to each string and then compress it. This is more intuitive, and it makes the style consistent with :class:`~k1lib.cli.output.file`. Passing in single string or byte is ok too:: "0123456789" | kzip() # returns gz bytes b"0123456789" | kzip() # returns gz bytes, exact same pattern as above Why "kzip" and not just "zip"? Because "zip" is a builtin python keyword. See also: :class:`kunzip` :param fmt: desired compressed format. Currently only supports "gz" or "bz2" """ # kzip fmt = fmt.strip(".") # kzip if fmt == "gzip" or fmt == "gz": # kzip self.o = zlib.compressobj(wbits=31) # kzip elif fmt == "bzip2" or fmt == "bz2": # kzip self.o = bz2.BZ2Compressor() # kzip else: raise Exception(f"File type {fmt} not supported. Specify either 'gz' or 'bz2'") # kzip
[docs] def __ror__(self, it): # kzip o = self.o # kzip def gen(): # kzip for e in it: # kzip if isinstance(e, str): e = f"{e}\n".encode() # kzip res = o.compress(e) # kzip if res: yield res # kzip try: # kzip res = o.flush() # kzip if res: yield res # kzip except: pass # kzip if isinstance(it, str): it = it.encode() # kzip if isinstance(it, bytes): return [o.compress(it), o.flush()] | cli.filt("x") | cli.aS(b"".join) # kzip else: return gen() # kzip
class decompressobj: # .gz or .bz2 file # decompressobj """ Why not just use zlib.decompressobj() directly? I encountered a strange bug when trying to decompress CommonCrawl's gzip files, posted on Stack Overflow and got help from none other than Mark Adler, creator of gzip and zlib. That guy's super active on Stack Overflow. Anyway, this code here is a modified version of his code. Here's the discussion: https://stackoverflow.com/questions/76452480/pythons-zlib-doesnt-work-on-commoncrawl-file """ # decompressobj def __init__(self, gz=True): self.gz = gz # decompressobj def __ror__(self, it): # decompressobj gz = self.gz; data = left = b''; o = zlib.decompressobj(zlib.MAX_WBITS|32) if gz else bz2.BZ2Decompressor() # decompressobj for got in it: # decompressobj yield o.decompress(left + got); left = b'' # decompressobj if o.eof: left = o.unused_data; o = zlib.decompressobj(zlib.MAX_WBITS|32) if gz else bz2.BZ2Decompressor() # decompressobj if len(got) == 0 and len(left) == 0: break # decompressobj stream_unzip = k1lib.dep("stream_unzip", url="https://pypi.org/project/stream-unzip/") # decompressobj class decompressobjZip: # .zip files # decompressobjZip def __ror__(self, it): # decompressobjZip for a, b, c in stream_unzip.stream_unzip(it): yield from c; return # decompressobjZip
[docs]class kunzip(BaseCli): # kunzip
[docs] def __init__(self, text=False): # kunzip """Incrementally decompress a stream of data using gzip or bzip2. Example:: # returns an iterator of bytes. cat() command can shorten to cat("someFile.gz", False, True) cat("someFile.gz", text=False, chunks=True) | unzip() # incrementally fetches remote file, then incrementally unzips it cat("https://example.com/someFile.gz", False, True) | unzip() data = range(100) | apply(lambda x: str(x).encode()) | deref() # list of bytes data | kzip() | unzip() | deref() # returns original data in list of bytes. May split the bytes at different positions though How does it know which algorithm to pick to decompress? It looks at the first few bytes of the file for its magic number. Also, if you're expecting a text file after decompression, you can do this to get the lines directly:: # returns iterator of strings cat("https://example.com/someFile.gz", False, True) | unzip(True) One more thing. If you're planning to use this to download whole files, you might want to tune the chunk size in :data:`~k1lib.settings`.cli.cat.chunkSize as it might speed things up considerably to raise it. Or may be you should just use wget instead =)) More examples of the different styles to use this cli:: ["abc"] | kzip() | unzip(True) | deref() # returns ["abc"] "abc" | kzip() | unzip(True) # returns "abc" "abc" | kzip() | unzip() # returns b"abc" See also: :class:`kzip`. .. admonition:: Reading files directly The original purpose of this cli is to incrementally decompress a byte stream. But a lot of time, that byte stream is coming from a file, and typing out ``cat(fn, False, True)`` to generate a byte stream to feed into this cli is tedious, so instead, you can pipe in the file name and this will just unzips it and return the string/byte stream to you:: "abc.gz" | unzip() # returns string iterator "abc.bz2" | unzip() # same "abc.zip" | unzip() # returns string iterator from the first subfile only. All subsequent subfiles are ignored .. admonition:: .zip files This can also work with subfiles within .zip files, like this:: "abc.zip" | unzip() # unzips the first subfile "abc.zip" | ls() # lists out all subfiles "abc.zip" | ls() | rItem(2) | unzip() # unzips the 3rd subfile :param text: whether to yield string lines or bytes""" # kunzip self.text = text # kunzip
[docs] def __ror__(self, it): # kunzip def gen(it): # generates a stream of bytes # kunzip it = iter(it); e = next(it) # kunzip if e[:2] == b"\x1f\x8b": o = decompressobj(True) # .gz # kunzip elif e[:3] == b"BZh": o = decompressobj(False) # .bz2 # kunzip elif e[:2] == b"PK": o = decompressobjZip() # .zip # kunzip else: raise Exception("Can't infer the file type (whether gz or bz2) of this file") # kunzip yield from [[e], it] | cli.joinStreams() | o # kunzip single = False # kunzip if isinstance(it, str): return cat(it, False, True) | kunzip(self.text) # special case for reading file names directly # kunzip if isinstance(it, ZipWrapper): return it | cat(text=self.text, chunks=True) # special case for .zip subfile # kunzip if isinstance(it, bytes): single = True; it = [it] # kunzip if self.text: # kunzip def gen2(it): # generates a stream of strings # kunzip last = b"" # last split is probably not the right line boundary, so gotta do this # kunzip for e in gen(it): # kunzip e = last + e; splits = e.split(b"\n") # kunzip for line in splits[:-1]: yield line.decode() # kunzip last = splits[-1] # kunzip yield last.decode() # kunzip res = gen2(it) # kunzip else: res = gen(it) # kunzip if not single: return res # kunzip else: return "\n".join(res) if self.text else b"".join(res) # kunzip
unzip = kunzip # kunzip