Source code for k1lib.kcom

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This module is for communication with hardware. This is exposed automatically with::

   from k1lib.imports import *
   kcom.Gsm # exposed
"""
import k1lib, base64, io, os, time; import k1lib.cli as cli; import numpy as np; from collections import deque
serial = k1lib.dep("serial", "pyserial", "https://pyserial.readthedocs.io")
__all__ = ["Gsm", "Host"]
[docs]class Gsm: # Gsm
[docs] def __init__(self, device="/dev/ttyUSB0"): # Gsm """Communicates with a GSM module using AT codes via a serial device. Example:: g = kcom.Gsm("/dev/ttyUSB0") g # display in cell, will show basic info g.sendSms("+1215123456", "some message") s.readSms() """ # Gsm self.device = device # Gsm self.conn = serial.Serial(device) # Gsm
[docs] def sendRaw(self, data:bytes): # Gsm """Sends data to the module and read response until 'OK\\n' is received""" # Gsm if isinstance(data, str): data = data.encode() # Gsm if not isinstance(data, bytes): raise Exception("Data has to be bytes") # Gsm self.conn.write(data + b"\n"); res = b"" # Gsm while True: # Gsm time.sleep(0.003); res += self.conn.read_all().replace(b"\r", b"") # Gsm if res[-3:] == b"OK\n": return res # Gsm
[docs] def send(self, data:bytes): # Gsm """Similar to :meth:`sendRaw`, but this time clean up control signals like 'OK\\n'""" # Gsm res = self.sendRaw(data); splits = deque(res.split(b"\n")); splits.popleft() # Gsm while len(splits) and splits[-1] == b"": splits.pop() # Gsm if len(splits) and splits[-1] == b"OK": splits.pop() # Gsm while len(splits) and splits[-1] == b"": splits.pop() # Gsm return b"\n".join(splits) # Gsm
[docs] def sendSms(self, number:str, data:bytes): # Gsm """Sends text message to some number.""" # Gsm if isinstance(data, str): data = data.encode() # Gsm if not isinstance(data, bytes): raise Exception("Data has to be bytes") # Gsm return self.send(f'AT+CMGS="{number}"\n'.encode() + data + b"\x1a") # Gsm
[docs] def readSms(self, mode=1): # Gsm """Reads text messages. :param mode: 0 for all, 1 for unread, 2 for read""" # Gsm self.send(b"AT+CMGF=1\n") # sets text mode # Gsm if mode == 0: return self.send(b'AT+CMGL="ALL"') # Gsm elif mode == 1: return self.send(b'AT+CMGL="REC UNREAD"') # Gsm elif mode == 2: return self.send(b'AT+CMGL="REC READ"') # Gsm
[docs] def close(self): self.conn.close() # Gsm
def __repr__(self): # Gsm return [ # Gsm ["International Mobile Subscriber Identity", self.send(b"AT+CIMI")], # Gsm ["Ready?", self.send(b"AT+CPIN?")], # Gsm ["Cellular network registration", self.send(b"AT+CREG?")], # Gsm ["Signal strength & bit error rate", self.send(b"AT+CSQ")], # Gsm ["General Packet Radio Service (aka data)", self.send(b"AT+CGATT?")], # Gsm ["Packet Data Protocol context", self.send(b"AT+CGACT?").replace(b"\n", b" - ")]] | cli.apply(cli.op().decode(), 1) | cli.pretty() | cli.join("\n") # Gsm
import shlex # Gsm _host_baseFn = [None]; _host_autoInc = k1lib.AutoIncrement() # Gsm def _host_getTmpDir(): # _host_getTmpDir if _host_baseFn[0] is None: _host_baseFn[0] = b"" | cli.file(); os.remove(_host_baseFn[0]) # _host_getTmpDir return _host_baseFn[0] + f"_{int(time.time())}_{_host_autoInc()}_{os.getpid()}" # _host_getTmpDir def _host_stripOut(out:"list[bytes]", preds): # _host_stripOut out = deque(out) # _host_stripOut while len(out) > 0: # removing annoying log messages from ssh and bash # _host_stripOut broken = False # _host_stripOut for pred in preds: # _host_stripOut if pred(out[0]): out.popleft(); broken = True; break # _host_stripOut if not broken: break # _host_stripOut return list(out) # _host_stripOut
[docs]class Host: # Host
[docs] def __init__(self, user=None, host=None, container=None, verbose=False, password=None, stripPredicates=[]): # Host """Represents another computer that you might want to execute commands in. Examples:: h = kcom.Host("username", "hostname") h.execPy("print('something')", fns=["~/.bashrc"], rmFn=False) # returns [List[bytes], List[bytes], List[bytes]] There are several available modes:: h = kcom.Host(user="username", host="hostname") # Mode 1: executes in ssh host h = kcom.Host(container="nginx-1") # Mode 2: executes in container on localhost h = kcom.Host(host="hostname", container="nginx-1") # Mode 3: executes in container on remote ssh host Mode 1 and 2 are relatively straightforward and airtight. Mode 3 is a little buggy in stdout and stderr. Once you have constructed them, you can execute random python/bash scripts in the remote host:: h.execPy("import os; print(os.environ)") Main value of this class is that it can execute random scripts in a convenient manner. I've found myself needing this way more often than expected. :param user: username of the ssh host :param host: host name of the ssh host :param container: container name. If specified will run commands inside that container :param verbose: if True, will print out detailed commands that're executed and their results :param password: if True, will try to login to ssh host using a password instead of the default key file :param stripPredicates: list of predicates that if match, will delete those first few lines in stdout and stderr. This is to strip away annoying boilerplate messages that I couldn't quite get rid of myself """ # Host self.user = user; self.host = host; self.container = container; self._connStatus = None; self.verbose = verbose; self.password = password; self.stripPredicates = [cli.op.solidify(f) for f in [lambda x: x.startswith(b"Connection to "), *stripPredicates]] # Host self._userHost = None if host is None else (host if user is None else f"{user}@{host}"); self.pwPrefix = f"sshpass -p {shlex.quote(password)} " if password else "" # Host
def _connCheck(self): # Host if len(self._exec1("pwd")) == 0: raise Exception(f"Can't connect. Please check your ssh/docker permissions") # Host def _exec1(self, c): # execute normal commands, no input expected, just output # Host if self.verbose: print(f"executing command: {repr(c)}") # Host if self.container is None: # ssh only # Host c = f"{self.pwPrefix}ssh {self._userHost} {shlex.quote(c)}" # Host if self.verbose: print(f"-----command: {repr(c)}") # Host res = None | cli.cmd(c, mode=0, text=False) | cli.deref() # Host if self.verbose: print(f"-----res: {repr(res)}") # Host return res # Host elif self.host is None: # container only # Host c = f"docker exec -i {self.container} sh -c {shlex.quote(c)}" # Host if self.verbose: print(f"-----command: {repr(c)}") # Host res = None | cli.cmd(c, mode=0, text=False) | cli.deref() # Host if self.verbose: print(f"-----res: {repr(res)}") # Host return res # Host else: # ssh and container # Host d = f"docker exec -i {self.container} sh -c {shlex.quote(c)}"; e = f"bash -ic {shlex.quote(d)}" # Host # e = shlex.quote(f"{{ bash -ic {shlex.quote(d)} }} 2>/dev/null") # Host c = f"{self.pwPrefix}ssh {self._userHost} -tt {shlex.quote(e)}" # Host if self.verbose: print(f"-----command: {repr(c)}") # Host res = None | cli.cmd(c, mode=0, text=False) | cli.apply(_host_stripOut, preds=self.stripPredicates) | cli.deref() # Host if self.verbose: print(f"-----res: {repr(res)}") # Host return res # Host def _exec2(self, fnLocal, fnRemote): # send file to remote, no output expected, just input # Host if self.verbose: print(f"transferring file {repr(fnLocal)} to {repr(fnRemote)}") # Host if self.container is None: # ssh only # Host d = f"cat > {shlex.quote(fnRemote)}"; c = f"({self.pwPrefix}ssh {self._userHost} -T {shlex.quote(d)}) < {shlex.quote(fnLocal)}" # Host if self.verbose: print(f"-----command: {repr(c)}") # Host res = None | cli.cmd(c, mode=0, text=False) | cli.deref() # Host if self.verbose: print(f"-----res: {repr(res)}") # Host return res # Host elif self.host is None: # container only # Host d = f'cat > {shlex.quote(fnRemote)}'; c = f"(docker exec -i {self.container} sh -c {shlex.quote(d)}) < {shlex.quote(fnLocal)}" # Host if self.verbose: print(f"-----command: {repr(c)}") # Host res = None | cli.cmd(c, mode=0, text=False) | cli.deref() # Host if self.verbose: print(f"-----res: {repr(res)}") # Host return res # Host else: # ssh and container. TODO: both stdout and stderr goes to stdout, and stderr has some filler material from logging in. Need to fix, but am lazy # Host c = f"cat > {shlex.quote(fnRemote)}"; d = f"docker exec -i {self.container} sh -c {shlex.quote(c)}"; e = f"bash -ic {shlex.quote(d)}" # Host c = f"({self.pwPrefix}ssh {self._userHost} -t {shlex.quote(e)}) < {shlex.quote(fnLocal)}" # Host if self.verbose: print(f"-----command: {repr(c)}") # Host res = None | cli.cmd(c, mode=0, text=False) | cli.deref() # Host if self.verbose: print(f"-----res: {repr(res)}") # Host return res # Host def _exec(self, c, fns, rmFn, executable): # Host _userHost = self._userHost; tmpDir = _host_getTmpDir() # Host self._connCheck(); None | cli.cmd(f"mkdir -p {shlex.quote(tmpDir)}") | cli.deref(); self._exec1(f"mkdir -p {shlex.quote(tmpDir)}") # Host c | cli.file(f"{tmpDir}/script"); self._exec2(f"{tmpDir}/script", f"{tmpDir}/script") # Host out, err = self._exec1(f"{shlex.quote(executable)} {shlex.quote(tmpDir + '/script')}") # Host if fns is not None: # Host files = fns | cli.apply(lambda fn: self._exec1(f"cat {shlex.quote(fn)}") | cli.item()) | cli.deref() # Host if rmFn: fns | cli.apply(lambda fn: self._exec1(f"rm {shlex.quote(fn)}") | cli.item()) | cli.deref() # Host else: files = [] # Host None | cli.cmd(f"rm -rf {shlex.quote(tmpDir)}") | cli.deref(); self._exec1(f"rm -rf {shlex.quote(tmpDir)}"); return [out, err, *files] # Host
[docs] def execPy(self, c:str, fns=None, rmFn=False, pyExec=None): # Host """Executes some python code. Examples:: If .fns is not specified, then will return (List[bytes], List[bytes]) containing (stdout, stderr). If .fns is specified, then will return (List[bytes], List[bytes], List[bytes], ...) containing (stdout, stderr, fn1, fn2, ...). Each file is a List[bytes], with endline byte at the end of each bytes chunk :param c: Python commands :param fns: file names to retrieve. Don't use paths that can expand, like "~/.bashrc" or "$HOME/.bashrc", they won't be expanded :param rmFn: if True, removes the files after running and exiting, else don't remove the files. Some scripts auto generates files that should be removed, but others don't :param pyExec: py executable used to run the generated python file, like "/usr/bin/python". If not specified will try to auto detect what python binaries are available """ # Host if pyExec is None: # Host ans = self._exec1("which python") # Host if len(ans[0]) == 0: raise Exception("No python binary on PATH found. Please manually specify the location of the Python binary by setting .pyExec") # Host pyExec = ans[0][-1].decode().strip("\r\n") # Host return self._exec(c, fns, rmFn, pyExec) # Host
[docs] def execSh(self, c:str, fns=None, rmFn=False, shExec="/bin/bash"): # Host """Executes some bash code. Pretty much the same as :meth:`execPy`, but for bash. See that method's docs.""" # Host return self._exec(c, fns, rmFn, shExec) # Host
def __repr__(self): # Host if self._connStatus is None: self._connStatus = len(self._exec1("pwd")[0]) > 0 # Host u = f"user='{self.user}' " if self.user else ""; h = f"host='{self.host}' " if self.host else "" # Host c = f"container='{self.container}' " if self.container else "" # Host return f"<Host {u}{h}{c}connected={self._connStatus}>" # Host