# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This module is for communication with hardware. This is exposed automatically with::
from k1lib.imports import *
kcom.Gsm # exposed
"""
import k1lib, base64, io, os, time; import k1lib.cli as cli; import numpy as np; from collections import deque
serial = k1lib.dep("serial", "pyserial", "https://pyserial.readthedocs.io")
__all__ = ["Gsm", "Host"]
[docs]class Gsm: # Gsm
[docs] def __init__(self, device="/dev/ttyUSB0"): # Gsm
"""Communicates with a GSM module using AT codes via a serial device.
Example::
g = kcom.Gsm("/dev/ttyUSB0")
g # display in cell, will show basic info
g.sendSms("+1215123456", "some message")
s.readSms()
""" # Gsm
self.device = device # Gsm
self.conn = serial.Serial(device) # Gsm
[docs] def sendRaw(self, data:bytes): # Gsm
"""Sends data to the module and read response until 'OK\\n' is received""" # Gsm
if isinstance(data, str): data = data.encode() # Gsm
if not isinstance(data, bytes): raise Exception("Data has to be bytes") # Gsm
self.conn.write(data + b"\n"); res = b"" # Gsm
while True: # Gsm
time.sleep(0.003); res += self.conn.read_all().replace(b"\r", b"") # Gsm
if res[-3:] == b"OK\n": return res # Gsm
[docs] def send(self, data:bytes): # Gsm
"""Similar to :meth:`sendRaw`, but this time clean up control signals like 'OK\\n'""" # Gsm
res = self.sendRaw(data); splits = deque(res.split(b"\n")); splits.popleft() # Gsm
while len(splits) and splits[-1] == b"": splits.pop() # Gsm
if len(splits) and splits[-1] == b"OK": splits.pop() # Gsm
while len(splits) and splits[-1] == b"": splits.pop() # Gsm
return b"\n".join(splits) # Gsm
[docs] def sendSms(self, number:str, data:bytes): # Gsm
"""Sends text message to some number.""" # Gsm
if isinstance(data, str): data = data.encode() # Gsm
if not isinstance(data, bytes): raise Exception("Data has to be bytes") # Gsm
return self.send(f'AT+CMGS="{number}"\n'.encode() + data + b"\x1a") # Gsm
[docs] def readSms(self, mode=1): # Gsm
"""Reads text messages.
:param mode: 0 for all, 1 for unread, 2 for read""" # Gsm
self.send(b"AT+CMGF=1\n") # sets text mode # Gsm
if mode == 0: return self.send(b'AT+CMGL="ALL"') # Gsm
elif mode == 1: return self.send(b'AT+CMGL="REC UNREAD"') # Gsm
elif mode == 2: return self.send(b'AT+CMGL="REC READ"') # Gsm
[docs] def close(self): self.conn.close() # Gsm
def __repr__(self): # Gsm
return [ # Gsm
["International Mobile Subscriber Identity", self.send(b"AT+CIMI")], # Gsm
["Ready?", self.send(b"AT+CPIN?")], # Gsm
["Cellular network registration", self.send(b"AT+CREG?")], # Gsm
["Signal strength & bit error rate", self.send(b"AT+CSQ")], # Gsm
["General Packet Radio Service (aka data)", self.send(b"AT+CGATT?")], # Gsm
["Packet Data Protocol context", self.send(b"AT+CGACT?").replace(b"\n", b" - ")]] | cli.apply(cli.op().decode(), 1) | cli.pretty() | cli.join("\n") # Gsm
import shlex # Gsm
_host_baseFn = [None]; _host_autoInc = k1lib.AutoIncrement() # Gsm
def _host_getTmpDir(): # _host_getTmpDir
if _host_baseFn[0] is None: _host_baseFn[0] = b"" | cli.file(); os.remove(_host_baseFn[0]) # _host_getTmpDir
return _host_baseFn[0] + f"_{int(time.time())}_{_host_autoInc()}_{os.getpid()}" # _host_getTmpDir
def _host_stripOut(out:"list[bytes]", preds): # _host_stripOut
out = deque(out) # _host_stripOut
while len(out) > 0: # removing annoying log messages from ssh and bash # _host_stripOut
broken = False # _host_stripOut
for pred in preds: # _host_stripOut
if pred(out[0]): out.popleft(); broken = True; break # _host_stripOut
if not broken: break # _host_stripOut
return list(out) # _host_stripOut
[docs]class Host: # Host
[docs] def __init__(self, user=None, host=None, container=None, verbose=False, password=None, stripPredicates=[]): # Host
"""Represents another computer that you might want to execute commands in.
Examples::
h = kcom.Host("username", "hostname")
h.execPy("print('something')", fns=["~/.bashrc"], rmFn=False) # returns [List[bytes], List[bytes], List[bytes]]
There are several available modes::
h = kcom.Host(user="username", host="hostname") # Mode 1: executes in ssh host
h = kcom.Host(container="nginx-1") # Mode 2: executes in container on localhost
h = kcom.Host(host="hostname", container="nginx-1") # Mode 3: executes in container on remote ssh host
Mode 1 and 2 are relatively straightforward and airtight. Mode 3 is a little buggy
in stdout and stderr. Once you have constructed them, you can execute random
python/bash scripts in the remote host::
h.execPy("import os; print(os.environ)")
Main value of this class is that it can execute random scripts in a convenient
manner. I've found myself needing this way more often than expected.
:param user: username of the ssh host
:param host: host name of the ssh host
:param container: container name. If specified will run commands inside that container
:param verbose: if True, will print out detailed commands that're executed and their results
:param password: if True, will try to login to ssh host using a password instead of the default key file
:param stripPredicates: list of predicates that if match, will delete those first few lines in stdout
and stderr. This is to strip away annoying boilerplate messages that I couldn't quite get rid of myself
""" # Host
self.user = user; self.host = host; self.container = container; self._connStatus = None; self.verbose = verbose; self.password = password; self.stripPredicates = [cli.op.solidify(f) for f in [lambda x: x.startswith(b"Connection to "), *stripPredicates]] # Host
self._userHost = None if host is None else (host if user is None else f"{user}@{host}"); self.pwPrefix = f"sshpass -p {shlex.quote(password)} " if password else "" # Host
def _connCheck(self): # Host
if len(self._exec1("pwd")) == 0: raise Exception(f"Can't connect. Please check your ssh/docker permissions") # Host
def _exec1(self, c): # execute normal commands, no input expected, just output # Host
if self.verbose: print(f"executing command: {repr(c)}") # Host
if self.container is None: # ssh only # Host
c = f"{self.pwPrefix}ssh {self._userHost} {shlex.quote(c)}" # Host
if self.verbose: print(f"-----command: {repr(c)}") # Host
res = None | cli.cmd(c, mode=0, text=False) | cli.deref() # Host
if self.verbose: print(f"-----res: {repr(res)}") # Host
return res # Host
elif self.host is None: # container only # Host
c = f"docker exec -i {self.container} sh -c {shlex.quote(c)}" # Host
if self.verbose: print(f"-----command: {repr(c)}") # Host
res = None | cli.cmd(c, mode=0, text=False) | cli.deref() # Host
if self.verbose: print(f"-----res: {repr(res)}") # Host
return res # Host
else: # ssh and container # Host
d = f"docker exec -i {self.container} sh -c {shlex.quote(c)}"; e = f"bash -ic {shlex.quote(d)}" # Host
# e = shlex.quote(f"{{ bash -ic {shlex.quote(d)} }} 2>/dev/null") # Host
c = f"{self.pwPrefix}ssh {self._userHost} -tt {shlex.quote(e)}" # Host
if self.verbose: print(f"-----command: {repr(c)}") # Host
res = None | cli.cmd(c, mode=0, text=False) | cli.apply(_host_stripOut, preds=self.stripPredicates) | cli.deref() # Host
if self.verbose: print(f"-----res: {repr(res)}") # Host
return res # Host
def _exec2(self, fnLocal, fnRemote): # send file to remote, no output expected, just input # Host
if self.verbose: print(f"transferring file {repr(fnLocal)} to {repr(fnRemote)}") # Host
if self.container is None: # ssh only # Host
d = f"cat > {shlex.quote(fnRemote)}"; c = f"({self.pwPrefix}ssh {self._userHost} -T {shlex.quote(d)}) < {shlex.quote(fnLocal)}" # Host
if self.verbose: print(f"-----command: {repr(c)}") # Host
res = None | cli.cmd(c, mode=0, text=False) | cli.deref() # Host
if self.verbose: print(f"-----res: {repr(res)}") # Host
return res # Host
elif self.host is None: # container only # Host
d = f'cat > {shlex.quote(fnRemote)}'; c = f"(docker exec -i {self.container} sh -c {shlex.quote(d)}) < {shlex.quote(fnLocal)}" # Host
if self.verbose: print(f"-----command: {repr(c)}") # Host
res = None | cli.cmd(c, mode=0, text=False) | cli.deref() # Host
if self.verbose: print(f"-----res: {repr(res)}") # Host
return res # Host
else: # ssh and container. TODO: both stdout and stderr goes to stdout, and stderr has some filler material from logging in. Need to fix, but am lazy # Host
c = f"cat > {shlex.quote(fnRemote)}"; d = f"docker exec -i {self.container} sh -c {shlex.quote(c)}"; e = f"bash -ic {shlex.quote(d)}" # Host
c = f"({self.pwPrefix}ssh {self._userHost} -t {shlex.quote(e)}) < {shlex.quote(fnLocal)}" # Host
if self.verbose: print(f"-----command: {repr(c)}") # Host
res = None | cli.cmd(c, mode=0, text=False) | cli.deref() # Host
if self.verbose: print(f"-----res: {repr(res)}") # Host
return res # Host
def _exec(self, c, fns, rmFn, executable): # Host
_userHost = self._userHost; tmpDir = _host_getTmpDir() # Host
self._connCheck(); None | cli.cmd(f"mkdir -p {shlex.quote(tmpDir)}") | cli.deref(); self._exec1(f"mkdir -p {shlex.quote(tmpDir)}") # Host
c | cli.file(f"{tmpDir}/script"); self._exec2(f"{tmpDir}/script", f"{tmpDir}/script") # Host
out, err = self._exec1(f"{shlex.quote(executable)} {shlex.quote(tmpDir + '/script')}") # Host
if fns is not None: # Host
files = fns | cli.apply(lambda fn: self._exec1(f"cat {shlex.quote(fn)}") | cli.item()) | cli.deref() # Host
if rmFn: fns | cli.apply(lambda fn: self._exec1(f"rm {shlex.quote(fn)}") | cli.item()) | cli.deref() # Host
else: files = [] # Host
None | cli.cmd(f"rm -rf {shlex.quote(tmpDir)}") | cli.deref(); self._exec1(f"rm -rf {shlex.quote(tmpDir)}"); return [out, err, *files] # Host
[docs] def execPy(self, c:str, fns=None, rmFn=False, pyExec=None): # Host
"""Executes some python code.
Examples::
If .fns is not specified, then will return (List[bytes], List[bytes]) containing (stdout, stderr).
If .fns is specified, then will return (List[bytes], List[bytes], List[bytes], ...) containing
(stdout, stderr, fn1, fn2, ...). Each file is a List[bytes], with endline byte at the end of each bytes chunk
:param c: Python commands
:param fns: file names to retrieve. Don't use paths that can expand, like "~/.bashrc" or
"$HOME/.bashrc", they won't be expanded
:param rmFn: if True, removes the files after running and exiting, else don't remove the
files. Some scripts auto generates files that should be removed, but others don't
:param pyExec: py executable used to run the generated python file, like "/usr/bin/python". If
not specified will try to auto detect what python binaries are available
""" # Host
if pyExec is None: # Host
ans = self._exec1("which python") # Host
if len(ans[0]) == 0: raise Exception("No python binary on PATH found. Please manually specify the location of the Python binary by setting .pyExec") # Host
pyExec = ans[0][-1].decode().strip("\r\n") # Host
return self._exec(c, fns, rmFn, pyExec) # Host
[docs] def execSh(self, c:str, fns=None, rmFn=False, shExec="/bin/bash"): # Host
"""Executes some bash code. Pretty much the same
as :meth:`execPy`, but for bash. See that method's docs.""" # Host
return self._exec(c, fns, rmFn, shExec) # Host
def __repr__(self): # Host
if self._connStatus is None: self._connStatus = len(self._exec1("pwd")[0]) > 0 # Host
u = f"user='{self.user}' " if self.user else ""; h = f"host='{self.host}' " if self.host else "" # Host
c = f"container='{self.container}' " if self.container else "" # Host
return f"<Host {u}{h}{c}connected={self._connStatus}>" # Host