Source code for k1lib.cli.lsext

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import k1lib as k1, os, json, k1lib, time, dill, threading
import k1lib.cli as cli
from collections import deque
from functools import lru_cache
from contextlib import contextmanager
__all__ = ["sql", "sqldb", "sqltable", "sqlrow", "minio", "s3", "s3bucket", "s3obj", "Redis"]
psycopg2 = k1.dep("psycopg2", "psycopg2-binary", "https://pypi.org/project/psycopg2/")
class PgConn:                                                                    # PgConn
    def __init__(self, p:"PgConns", db:str):                                     # PgConn
        self.p = p; self.db = db; self.conn = None                               # PgConn
    def _connect(self): p = self.p; self.conn = psycopg2.connect(host=p.host, port=p.port, database=self.db, user=p.user, password=p.password); self.conn.autocommit = True # PgConn
    def queryRaw(self, f):                                                       # PgConn
        if self.conn is None: self._connect()                                    # PgConn
        cur = self.conn.cursor(); res = f(cur); cur.close(); return res # absolute barebones, just prepare the cursor and handles all errors before # PgConn
    def query(self, query, *args, mode=0): # modes: 0 (result), 1 ([res, desc]), 2 (assumes result is a table, then inserts desc to the top row) # PgConn
        def inner(cur):                                                          # PgConn
            res = None; desc = None; cur.execute(query, args)                    # PgConn
            try: res = cur.fetchall()                                            # PgConn
            except psycopg2.ProgrammingError: pass                               # PgConn
            if mode > 0: desc = [x.name for x in cur.description]                # PgConn
            return res if mode == 0 else ((res, desc) if mode == 1 else [desc, *res]) # PgConn
        return self.queryRaw(inner)                                              # PgConn
    def __repr__(self): return f"<PgConn host={self.p.host}:{self.p.port} db={self.db}>" # PgConn
class PgConns:                                                                   # PgConns
    def __init__(self, host:str, port:int, user:str, password:str):              # PgConns
        self.host = host; self.port = port; self.user = user; self.password = password; self.conns = {} # PgConns
    def __getitem__(self, key:str):                                              # PgConns
        if key not in self.conns: self.conns[key] = PgConn(self, key)            # PgConns
        return self.conns[key]                                                   # PgConns
    def __repr__(self): return f"<PgConns host={self.host}:{self.port}>"         # PgConns
class PgManager: # singleton, to efficiently manages all connections from everywhere # PgManager
    def __init__(self): self.connss = {}                                         # PgManager
    def __call__(self, host, port, user, password):                              # PgManager
        key = (host, port, user, password)                                       # PgManager
        if key not in self.connss: self.connss[key] = PgConns(host, port, user, password) # PgManager
        return self.connss[key]                                                  # PgManager
mysqlConn = k1.dep("mysql.connector", "mysql-connector-python", "https://pypi.org/project/mysql-connector-python/") # PgManager
class MyConn:                                                                    # MyConn
    def __init__(self, p:"MyConns", db:str): self.p = p; self.db = db; self.conn = None; self._connect() # MyConn
    def _connect(self): p = self.p; self.conn = mysqlConn.connect(host=p.host, port=p.port, database=self.db, user=p.user, password=p.password, charset='utf8mb4', collation='utf8mb4_general_ci'); self.conn.autocommit = True # MyConn
    def queryRaw(self, f): cur = self.conn.cursor(); res = f(cur); cur.close(); return res # MyConn
    def query(self, query, *args, mode=0):                                       # MyConn
        def inner(cur):                                                          # MyConn
            res = None; desc = None; cur.execute(query, args)                    # MyConn
            res = cur.fetchall()                                                 # MyConn
            if mode > 0: desc = [x[0] for x in cur.description]                  # MyConn
            return res if mode == 0 else ((res, desc) if mode == 1 else [desc, *res]) # MyConn
        return self.queryRaw(inner)                                              # MyConn
    def __repr__(self): return f"<MyConn host={self.p.host}:{self.p.port} db={self.db}>" # MyConn
class MyConns:                                                                   # MyConns
    def __init__(self, host:str, port:int, user:str, password:str):              # MyConns
        self.host = host; self.port = port; self.user = user; self.password = password; self.conns = {} # MyConns
    def __getitem__(self, key:str):                                              # MyConns
        if key not in self.conns: self.conns[key] = MyConn(self, key)            # MyConns
        return self.conns[key]                                                   # MyConns
    def __repr__(self): return f"<MyConns host={self.host}:{self.port}>"         # MyConns
class MyManager:                                                                 # MyManager
    def __init__(self): self.connss = {}                                         # MyManager
    def __call__(self, host, port, user, password):                              # MyManager
        key = (host, port, user, password)                                       # MyManager
        if key not in self.connss: self.connss[key] = MyConns(host, port, user, password) # MyManager
        return self.connss[key]                                                  # MyManager
sqlite3 = k1.dep("sqlite3");                                                     # MyManager
class LiConn:                                                                    # LiConn
    def __init__(self, conn, fn): self.conn = conn; self.fn = fn; self._lastrowid = None; self.lock = threading.Lock() # LiConn
    def queryRaw(self, f):                                                       # LiConn
        with self.lock: cur = self.conn.cursor(); res = f(cur); self.conn.commit(); cur.close(); return res # LiConn
    def query(self, query, *args, mode=0):                                       # LiConn
        def inner(cur):                                                          # LiConn
            res = None; desc = None; cur.execute(query, args); res = cur.fetchall() # LiConn
            if mode > 0: desc = [x[0] for x in cur.description]                  # LiConn
            try: self._lastrowid = cur.lastrowid                                 # LiConn
            except: pass                                                         # LiConn
            return res if mode == 0 else ((res, desc) if mode == 1 else [desc, *res]) # LiConn
        return self.queryRaw(inner)                                              # LiConn
    def __repr__(self): return f"<LiConn fn={self.fn} db='default'>"             # LiConn
class LiConns:                                                                   # LiConns
    def __init__(self, fn:str): self.conn = LiConn(sqlite3.connect(fn, check_same_thread=False), fn); self.fn = fn # LiConns
    def __getitem__(self, key:str): return self.conn                             # LiConns
    def __repr__(self): return f"<LiConns fn={self.fn}>"                         # LiConns
class LiManager:                                                                 # LiManager
    def __init__(self): self.connss = {}                                         # LiManager
    def __call__(self, host:str, port, user, password):                          # LiManager
        if host == ":memory:": return LiConns(host) # no caching, cause each instance is different! # LiManager
        host = os.path.expanduser(host)                                          # LiManager
        if host not in self.connss: self.connss[host] = LiConns(host)            # LiManager
        return self.connss[host]                                                 # LiManager
pgM = PgManager(); myM = MyManager(); liM = LiManager()                          # LiManager
settings = k1lib.settings                                                        # LiManager
@contextmanager                                                                  # LiManager
def mysqlCnf(user, password, host, port):                                        # mysqlCnf
    fn = f"""[client]\nuser = "{user}"\npassword = "{password or ''}"\nhost = "{host}"\nport = "{port}" """ | cli.file() # mysqlCnf
    try: yield fn                                                                # mysqlCnf
    finally: os.remove(fn)                                                       # mysqlCnf
rowCaches = set() # k1lib.cache objects for caching table[id]                    # mysqlCnf
def cb_rowCache_size(s,v):                                                       # cb_rowCache_size
    for rC in rowCaches: rC.maxsize = v                                          # cb_rowCache_size
def cb_rowCache_timeout(s, v):                                                   # cb_rowCache_timeout
    for rC in rowCaches: rC.timeout = v                                          # cb_rowCache_timeout
settings.cred.add("sql", k1lib.Settings()                                        # cb_rowCache_timeout
    .add("mode", "pg", env="K1_SQL_MODE")                                        # cb_rowCache_timeout
    .add("host", "127.0.0.1", "host name of db server, or db file name if mode='lite'. Warning: mysql's mysqldump won't resolve domain names, so it's best to pass in ip addresses", env="K1_SQL_HOST") # cb_rowCache_timeout
    .add("port", 3306, env="K1_SQL_PORT")                                        # cb_rowCache_timeout
    .add("user", "admin", sensitive=True, env="K1_SQL_USER")                     # cb_rowCache_timeout
    .add("password", "admin", sensitive=True, env="K1_SQL_PASSWORD")             # cb_rowCache_timeout
    .add("verbose", False, "if True, will print out all executed queries")       # cb_rowCache_timeout
    .add("sanitize", False, "if True, will sanitize all string columns with MarkupSafe", env=("K1_SQL_SANITIZE", lambda x: x.lower()[0] == "t")) # cb_rowCache_timeout
    .add("cache", k1lib.Settings()                                               # cb_rowCache_timeout
         .add("size", 3000, "Size of the cache, 0 to disable it", cb_rowCache_size, env=("K1_SQL_CACHE_SIZE", lambda x: int(x))) # cb_rowCache_timeout
         .add("timeout", 1, "After this number of seconds, the cached item will expire", cb_rowCache_timeout, env=("K1_SQL_CACHE_TIMEOUT", lambda x: float(x))), "Cache settings for table accesses. I.e table[id]") # cb_rowCache_timeout
, "anything related to sql, used by k1lib.cli.lsext.sql. See docs for that class for more details") # cb_rowCache_timeout
markupsafe = k1.dep("markupsafe", "MarkupSafe", "https://pypi.org/project/MarkupSafe/") # cb_rowCache_timeout
default = object(); qD = {"my": "`", "pg": "", "lite": ""} # quote dict          # cb_rowCache_timeout
def stripMemView(x): return bytes(x) if isinstance(x, memoryview) else x # postgresql returns memory view objects for bytea columns, instead of just raw bytes, so gotta strip it # stripMemView
class alldump(list): pass # dump of all databases                                # alldump
[docs] class sql: # sql
[docs] def __init__(self, host=default, port=default, user=default, password=default, mode=default): # sql """Creates a connection to a SQL database. Example:: s = sql("127.0.0.1") # creates a new sql object. Apparently, mysql and mysqldump on the command line bugs out if you put "localhost". If you put localhost here, it should work fine, but `sql(...) | toBytes()` and other functions that use external programs can deny you access s.refresh() # refreshes connection any time you encounter strange bugs s | ls() # returns List[sqldb], lists out all databases s | toBytes() # returns Iterator[str], dumps every databases. Yes, it's "toBytes", but it has the feeling of serializing whatever the input is, so it's at least intuitive in that way "dump.sql" | s # restores the database using the dump file cat("dump.sql") | s # restores the database using the dump file db1 = s["db1"] # returns sqldb, gets database named "db1" db1 | ls() # returns List[sqltable], list out all tables within this database db1 | toBytes() # returns Iterator[str], dumps the database users = db1["user"] # gets table named "user", short and simple db1.query("select * from users") # queries the database using your custom query db1.query("select * from users where user_id=%s", 3) # queries with prepared statement users.info() # prints out the first 10 rows of the table and the table schema users.cols # returns table's columns as List[str] len(users) # returns number of rows users.query(...) # can also do a custom query, just like with databases users[1] # grabs user with id 1. Returns None if user doesn't exist users[1:10] # grabs users with id from 1 (inclusive) to 10 (exclusive) users[1].firstname = "Reimu" # sets the first name of user with id 1 to "Reimu" users[1] = {"firstname": "Reimu", "lastname": "Hakurei"} # sets first name and last name of user with id 1 for user in users: user.firstname = "Reimu" # loops through all users and change the firstname to "Reimu" users.insert(firstname="Yuyuko", lastname="Saigyouji") # inserts a new row users.insertBulk(firstname=["Yuyuko", "Marisa"], lastname=["Saigyouji", "Kirisame"]) # inserts multiple rows at once users | toBytes() # dumps this specific table, returns Iterator[str] users | cat() | display() # reads entire table, gets first 10 rows and displays it out users | (cat() | head(20)) | display() # reads first 20 rows only, then displays the first 10 rows. Query sent is "select * from user limit 20" users | (cat() | filt("x == 4", 3) | head(20)) | display() # grabs first 20 rows that has the 4th column equal to 4, then displays the first 10 rows. Query sent is "select user_id, address, balance, age from user where age = 4", assuming the table only has those columns Philosophy for these methods is that they should be intuitive to look at and interact with, not for performance. If performance is needed, just write raw queries or don't use a database and use applyCl() instead. If any of the params here are not specified, they will take on the values from "settings.cred.sql", which can be initialized from environment variables. Execute and display/print "settings" in a new cell to see all of them. Create table commands for different engines, for quick reference. Postgresql:: \"\"\" CREATE TABLE users ( id bigserial primary key, name VARCHAR(50), age INT, time BIGINT, groupIds BIGINT[], data JSON );\"\"\" Sqlite:: \"\"\" CREATE TABLE users ( id INTEGER primary key autoincrement, name VARCHAR(50), age INT, time BIGINT, groupIds BIGINT[], data JSON );\"\"\" :param host: host name, ip address, or file name (in case of sqlite) :param port: port at the host :param uesr: database user name. If not specified then fallback to environment variable ``SQL_USER``, then ``USER`` :param password: database password. If not specified then assume database doesn't require one :param mode: currently supports 3 values: "my" (MySQL), "pg" (PostgreSQL) and "lite" (SQLite)""" # sql host = settings.cred.sql.host if host is default else host; port = settings.cred.sql.port if port is default else port; user = settings.cred.sql.user if user is default else user # sql password = settings.cred.sql.password if password is default else password; mode = settings.cred.sql.mode if mode is default else mode # sql if mode not in ("my", "pg", "lite"): raise Exception(f"Supports only 'my' (MySQL), 'pg' (PostgreSQL) and 'lite' (SQLite) for now, don't support {mode}") # sql self.host = host; self.port = port; self.user = user or os.environ.get("SQL_USER") or os.environ.get("USER") # sql self.password = password or os.environ.get("SQL_PASSWORD"); self.mode = mode; self.star = "%s" if mode == "my" or mode == "pg" else "?" # sql self.manager = {"pg": pgM, "my": myM, "lite": liM}[mode]; self.dbs = None; self.dbsL = None # lower case version # sql self.conn = self.manager(self.host, self.port, self.user, self.password)[None] # sql
[docs] def query(self, query, *args): # sql """Executes a query. Returns the resulting table (if select query). Example:: s.query("insert into users (name, age) values (%s, %s)", "Reimu", 25) """ # sql return self.conn.query(query, *args) # sql
@k1.cache(timeout=10, maxsize=1000, name="sql.ls", docs="Caches result of listing out all databases") # sql def _ls(self): # sql if self.mode == "my": self.dbs = {e[0]: sqldb(self, e[0]) for e in self.query("show databases")} # sql elif self.mode == "pg": self.dbs = {e[0]: sqldb(self, e[0]) for e in self.query("select datname from pg_database where datistemplate=false")} # sql elif self.mode == "lite": self.dbs = {"default": sqldb(self, "default")} # sql self.dbsL = {k.lower():v for k,v in self.dbs.items()} # sql return list(self.dbs.values()) # sql def __repr__(self): return f"<sql mode={self.mode} host={self._host}>" # sql @property # sql def _host(self) -> str: # sql if self.mode == "lite": return self.host.split(os.sep)[-1] # sql else: return f"{self.host}:{self.port}" # sql def _cnfCtx(self): return mysqlCnf(self.user, self.password, self.host, self.port) # sql def _toBytes(self): # sql if self.mode == "my": # sql with self._cnfCtx() as fn: k1lib.depCli("mysqldump"); return alldump(None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob --all-databases")) # sql elif self.mode == "pg": k1lib.depCli("pg_dumpall"); return alldump(None | cli.cmd(f"PGPASSWORD={self.password} pg_dumpall -h {self.host} -p {self.port} -U {self.user}")) # sql else: raise Exception(f"All databases dump of mode {self.mode} is not supported yet") # sql
[docs] def __ror__(self, it): # restoring a backup # sql if self.mode == "my": # sql def restore(fn): # sql k1lib.depCli("mysql") # sql with self._cnfCtx() as cnfFn: None | cli.cmd(f"mysql --defaults-file={cnfFn} < {fn}") | cli.ignore() # sql if isinstance(it, str): restore(it) # sql else: fn = it | cli.file(); restore(fn); os.remove(fn) # sql elif self.mode == "pg": # sql if isinstance(it, str): fn = it; isStr = True # sql elif not isinstance(it, alldump): raise Exception("The sql commands piped in is from a specific database, while you haven't chosen a particular database from this sql connection yet. Do something like `s = sql(...); db = s['some_db_name']; ['some sql'] | db`, instead of `['some sql'] | s` like you're doing now") # sql else: fn = it | cli.file(); isStr = False # sql try: None | cli.cmd(f"PGPASSWORD={self.password} psql -h {self.host} -p {self.port} -U {self.user} -d postgres < {fn}") | cli.ignore() # sql finally: # sql if not isStr: os.remove(fn) # sql else: raise Exception(f"Restoring database from .sql file of mode {self.mode} is not supported yet") # sql
def __len__(self): return len(self._ls()) # sql def __getitem__(self, idx): # sql self._ls() # sql if idx in self.dbs: return self.dbs[idx] # sql if idx.lower() in self.dbsL: return self.dbsL[idx.lower()] # sql raise Exception(f"Database '{idx}' does not exist, can't retrieve it") # sql def __iter__(self): return iter(self._ls()) # sql def __delitem__(self, idx): # sql if isinstance(idx, str): # sql db = self[idx] # sql if self.mode == "my": return self.query(f"drop database {idx}") # sql elif self.mode == "pg": return self.query(f"drop database {idx} with (force)") # sql else: raise Exception("Currently only support dropping databases in mysql and postgresql") # sql else: raise Exception("Only supports deleting with table names. Use this like `del db['some_table']`") # sql
class dbdump(list): pass # dbdump
[docs] class sqldb: # sqldb
[docs] def __init__(self, sql:sql, name:str): # sqldb """A sql database representation. Not expected to be instatiated by you. See also: :class:`sql`""" # sqldb self.sql = sql; self.name = name; self.tbls = {}; self.tblsL = {}; s = sql; self.conn = s.manager(s.host, s.port, s.user, s.password)[name] # sqldb
[docs] def query(self, query, *args): # sqldb """Executes a query in this database. Returns the resulting table (if select query). Example:: db = ... db.query("insert into users (name, age) values (%s, %s)", "Reimu", 25) """ # sqldb return self.conn.query(query, *args) # sqldb
@k1.cache(timeout=10, maxsize=1000, name="sqldb.ls", docs="Caches result of listing out all tables") # sqldb def _ls(self): # sqldb if self.sql.mode == "my": self.tbls = {e[0]: sqltable(self.sql, self, e[0]) for e in self.query(f"show tables")} # sqldb if self.sql.mode == "pg": self.tbls = {e[0]: sqltable(self.sql, self, e[0]) for e in self.query(f"select table_name from information_schema.tables where table_schema = 'public'")} # sqldb if self.sql.mode == "lite": self.tbls = {e[0]: sqltable(self.sql, self, e[0]) for e in self.query("select name from sqlite_master where type='table'")} # sqldb self.tblsL = {k.lower():v for k,v in self.tbls.items()}; return list(self.tbls.values()) # sqldb def __repr__(self): return f"<sqldb host={self.sql._host} db={self.name}>" # sqldb def _toBytes(self): # sqldb if self.sql.mode == "my": # sqldb with self.sql._cnfCtx() as fn: return dbdump(None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob --databases {self.name}")) # sqldb elif self.sql.mode == "pg": return dbdump(None | cli.cmd(f"PGPASSWORD={self.sql.password} pg_dump -h {self.sql.host} -p {self.sql.port} -U {self.sql.user} -d {self.name}")) # sqldb else: raise Exception(f"Database dump of mode {self.sql.mode} is not supported yet") # sqldb
[docs] def __ror__(self, it): # sqldb mode = self.sql.mode; q = qD[mode] # sqldb if mode == "my": # sqldb if isinstance(it, str): # sqldb with open(it, "r") as f: it = f.readlines() # loads file to ram # sqldb return self.sql.__ror__([f"USE {q}{self.name}{q};", *it]) # sqldb elif mode == "pg": # sqldb if isinstance(it, str): fn = it; isStr = True # sqldb elif isinstance(it, alldump): self.sql.__ror__(it); return # sqldb else: fn = it | cli.file(); isStr = False # sqldb try: None | cli.cmd(f"PGPASSWORD={self.sql.password} psql -h {self.sql.host} -p {self.sql.port} -U {self.sql.user} -d {self.name} < {fn}") | cli.ignore() # sqldb finally: # sqldb if not isStr: os.remove(fn) # sqldb
def __len__(self): return len(self._ls()) # sqldb def __getitem__(self, idx): # sqldb res = self._ls() # sqldb if isinstance(idx, str): # sqldb if idx in self.tbls: return self.tbls[idx] # sqldb if idx.lower() in self.tblsL: return self.tblsL[idx.lower()] # sqldb else: return res[idx] # sqldb def __iter__(self): return iter(self._ls()) # sqldb def __delitem__(self, idx): # sqldb if isinstance(idx, str): # sqldb if self.sql.mode == "pg": idx = idx.lower() # sqldb lsres = [x for x in self._ls() if x.name == idx] # sqldb if len(lsres) == 1: return self.query(f"drop table {idx}") # sqldb else: raise Exception(f"Table '{idx}' does not exist, can't delete it") # sqldb else: raise Exception("Only supports deleting with table names. Use this like `del db['some_table']`") # sqldb
class tbldump(list): pass # tbldump
[docs] class sqltable: # sqltable
[docs] def __init__(self, sql, sqldb, name:str): # sqltable """A sql table representation. Not expected to be instantiated by you. See also: :class:`sql`""" # sqltable self.sql = sql; self.sqldb = sqldb; self.name = name; self._cols = None; self.__col2Type = None # sqltable
def _col2Type(self): # sqltable if self.__col2Type == None: mode = self.sql.mode; i1,i2 = [1,2] if mode == "lite" else [0,1]; self.__col2Type = {row[i1]: row[i2].lower() for row in self._describe()[1:]} # sqltable return self.__col2Type # sqltable def _cat(self, ser): # sqltable cols = self.cols; _2 = [] # clis that can't be optimized, stashed away to be merged with ser later on # sqltable q = qD[self.sql.mode]; clis = deque(ser.clis); o1 = None; o2 = None; o3 = [] # cut(), head() and filt() opts # sqltable while len(clis) > 0: # sqltable c = clis.popleft() # sqltable if isinstance(c, cli.filt): _2.append(c); break # TODO: add optimizations for filt # sqltable elif o2 is None and isinstance(c, cli.head): # sqltable if round(c.n) != c.n or c.n < 0 or c.inverted or c.n == None: _2.append(c); break # sqltable else: o2 = f"limit {c.n}"; continue # sqltable elif o1 is None and isinstance(c, cli.cut): # sqltable if isinstance(c.columns, slice): _2.append(c); o1 = 0; continue # sqltable else: # sqltable o1 = ", ".join([f"{q}{c}{q}" for c in cols | cli.rows(*c.columns)]) # sqltable if len(c.columns) == 1: _2.append(cli.item().all() | cli.aS(list)) # sqltable else: _2.append(c); break # sqltable o1 = o1 or ", ".join([f"{q}{c}{q}" for c in cols]) # sqltable query = f"select {o1} from {q}{self.name}{q} {o2 or ''}"#; print(f"query: {query}"); return [] # sqltable sql = self.sql; return [sqlrow(sql, self, row) for row in self.sqldb.query(query) | cli.serial(*_2, *clis)] # sqltable @property # sqltable def cols(self): # sqltable """Get column names. Example:: db.cols # returns List[str], like ["id", "userId", "name", ...] """ # sqltable if not self._cols: self._cols = self._describe()[1:] | cli.cut({"my": 0, "pg": 0, "lite": 1}[self.sql.mode]) | cli.deref() # sqltable return self._cols # sqltable @lru_cache # sqltable def _describe(self): # sqltable if self.sql.mode == "my": return self.sqldb.query(f"describe `{self.name}`") | cli.insert(["Field", "Type", "Null", "Key", "Default", "Extra"]) | cli.deref() # sqltable if self.sql.mode == "pg": return self.sqldb.query(f"select column_name, data_type, is_nullable, column_default, ordinal_position from information_schema.columns where table_name='{self.name}' order by ordinal_position") | cli.insert(["column_name", "data_type", "is_nullable", "column_default", "ordinal_position"]) | cli.deref() # sqltable if self.sql.mode == "lite": return self.sqldb.query(f"pragma table_info([{self.name}])") | cli.insert(["cid", "name", "type", "notnull", "dflt_value", "pk"]) | cli.deref() # sqltable
[docs] def insert(self, **kwargs): # sqltable """Inserts a row. Example:: table = ... table.insert(firstname="Yuyuko", lastname="Saigyouji") """ # sqltable q = qD[self.sql.mode]; keys = ", ".join([f"{q}{x}{q}" for x in kwargs.keys()]); values = ", ".join([self.sql.star]*len(kwargs)) # sqltable mode = self.sql.mode; san = settings.cred.sql.sanitize; vs = []; _typeD = self._col2Type() # sqltable for k, v in kwargs.items(): # sqltable if isinstance(v, dict) or _typeD.get(k, "") == "json": vs.append(json.dumps(v)) # sqltable elif isinstance(v, (set, tuple)) or _typeD.get(k, "") == "array" or _typeD.get(k, "").endswith("[]"): vs.append((None if v is None else list(v)) if mode == "pg" else json.dumps(list(v))) # sqltable elif san and isinstance(v, str): vs.append(markupsafe.escape(v)) # sqltable else: vs.append(v) # sqltable if mode == "my" or mode == "lite": # sqltable self.query(f"insert into {self.name} ({keys}) values ({values})", *vs) # sqltable if mode == "my": return self[self.query("select last_insert_id()")[0][0]] # sqltable if mode == "lite": return self[self.sql.conn._lastrowid] # sqltable elif mode == "pg": return self[self.query(f"insert into {self.name} ({keys}) values ({values}) returning id", *vs)[0][0]] # sqltable
[docs] def insertBulk(self, **kwargs): # sqltable """Inserts a row. Example:: table = ... table.insertBulk(firstname=["Yuyuko", "Marisa"], lastname=["Saigyouji", "Kirisame"]) """ # sqltable s = set([len(x) for x in kwargs.values()]) # sqltable if len(s) == 0: return # sqltable if len(s) != 1: raise Exception("Different parameters have different number of rows!") # sqltable q = qD[self.sql.mode]; keys = ", ".join([f"{q}{x}{q}" for x in kwargs.keys()]); values = ", ".join([self.sql.star]*len(kwargs)) # sqltable mode = self.sql.mode; san = settings.cred.sql.sanitize; vs = []; _typeD = self._col2Type() # sqltable for k, v in kwargs.items(): # sqltable if isinstance(v[0], dict) or _typeD.get(k, "") == "json": vs.append([json.dumps(x) for x in v]) # sqltable elif isinstance(v[0], (set, tuple)) or _typeD.get(k, "") == "array" or _typeD.get(k, "").endswith("[]"): vs.append((None if v is None else list(v)) if mode == "pg" else json.dumps(list(v))) # sqltable elif san and isinstance(v[0], str): vs.append([markupsafe.escape(x) for x in v]) # sqltable else: vs.append(v) # sqltable return self.sqldb.conn.queryRaw(lambda cur: cur.execute(f"insert into {self.name} ({keys}) values ".encode() + b','.join(cur.mogrify("("+",".join(["%s"]*len(kwargs))+")", x) for x in list(vs | cli.T())))) # sqltable
def _update(self, _idx, **kwargs): # sqltable """Updates a row. Example:: table = ... table.update(3, firstname="Youmu", lastname="Konpaku") """ # sqltable q = qD[self.sql.mode]; p1 = ", ".join([f"{q}{k}{q} = {self.sql.star}" for k in kwargs.keys()]) # sqltable mode = self.sql.mode; san = settings.cred.sql.sanitize; values = []; _typeD = self._col2Type() # sqltable for k, v in kwargs.items(): # sqltable if isinstance(v, dict) or _typeD.get(k, "") == "json": values.append(json.dumps(v)) # sqltable elif isinstance(v, (set, tuple)) or _typeD.get(k, "") == "array" or _typeD.get(k, "").endswith("[]"): values.append((None if v is None else list(v)) if mode == "pg" else json.dumps(list(v))) # sqltable elif san and isinstance(v, str): values.append(markupsafe.escape(v)) # sqltable else: values.append(v) # sqltable self.query(f"update {self.name} set {p1} where {self.idCol} = {_idx}", *values) # sqltable
[docs] def info(self, out=False): # sqltable """Preview table. Example:: table = ... table.info() :param out: if True, returns a list of lines instead of printing them out""" # sqltable def gen(): # sqltable desc = self._describe() | cli.deref(); cols = self.cols; mode = self.sql.mode; q = qD[mode]; s = ", ".join([f"{q}{e}{q}" for e in cols]) # sqltable if mode == "my": # sqltable status = self.sqldb.query(f"show table status like '{self.name}'")[0] # sqltable status = f"engine:{status[1]}, #rows(approx):{status[4]}, len(row):{status[5]}, updated:{status[12]}UTC, created:{status[11]}UTC" # sqltable elif mode == "lite": # sqltable nrows = self.query(f"select max(rowid) from {q}{self.name}{q}")[0][0] # sqltable try: ncols = len(self.query(f"select * from {q}{self.name}{q} limit 1")[0]) # sqltable except: ncols = "?" # sqltable status = f"#rows(approx):{nrows}, len(row):{ncols}" # sqltable else: nrows = self.sqldb.query(f"select count(*) from {q}{self.name}{q}")[0][0]; status = f"{nrows} rows total" # sqltable idCol = "rowid" if mode == "lite" else self.idCol; print(f"Table `{self.name}` ({status})\n"); self.sqldb.query(f"select {s} from {q}{self.name}{q} order by {idCol} limit 9") | (cli.aS(stripMemView) | cli.aS(repr) | cli.head(50)).all(2) | cli.insert(cols) | cli.display() # sqltable try: # print tails # sqltable print("...\n..."); self.sqldb.query(f"select {s} from {q}{self.name}{q} order by {idCol} desc limit 9")\ | cli.reverse() | (cli.aS(stripMemView) | cli.aS(repr) | cli.head(50)).all(2) | cli.insert(cols) | cli.display() # sqltable except: pass # sqltable print("\nTable format:"); desc | cli.display(None) # sqltable if mode == "my": print("\nIndexes:"); self.sqldb.query(f"show indexes from {q}{self.name}{q}", mode=2) | cli.display() # sqltable elif mode == "pg": print("\nIndexes:"); self.sqldb.query(f"SELECT tablename, indexname, indexdef FROM pg_indexes WHERE schemaname = 'public' and tablename = '{self.name}'") | cli.display() # sqltable if out: # sqltable with k1.captureStdout() as out: gen() # sqltable return out() # sqltable else: gen() # sqltable
def _ls(self): return self.info() # sqltable
[docs] def query(self, query, *args): # sqltable """Executes a query in this table's database. Returns the resulting table (if select query). Example:: table = ... table.query("insert into users (name, age) values (%s, %s)", "Reimu", 25) """ # sqltable return self.sqldb.query(query, *args) # sqltable
[docs] def queryDesc(self, query, *args): return self.sqldb.queryDesc(query, *args) # sqltable
[docs] def select(self, query, *args): # sqltable """Pretty much identical to :meth:`query`, but this postprocess the results a little bit, and package them into :class:`sqlrow` objects. Example:: table = ... table.select("where name = %s", "Reimu") """ # sqltable lq = query.strip().lower() # sqltable if lq.startswith("select") and not lq.startswith("select *"): raise Exception(".select() requires the query to start with 'select *', or remove 'select * from table' statement entirely!") # sqltable if not lq.startswith("select"): query = f"select * from {self.name} {query}" # sqltable query = query.replace("select *", f"select {self.scols}").replace("SELECT *", f"select {self.scols}") # sqltable res = self.query(query, *args); sql = self.sql; return [sqlrow(sql, self, e) for e in res] # sqltable
def __repr__(self): return f"<sqltable host={self.sql._host} db={self.sqldb.name} table={self.name}>" # sqltable def _toBytes(self): # sqltable if self.sql.mode == "my": # sqltable with self.sql._cnfCtx() as fn: return tbldump(None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob {self.sqldb.name} {self.name}")) # sqltable elif self.sql.mode == "pg": return tbldump(None | cli.cmd(f"PGPASSWORD={self.sql.password} pg_dump -h {self.sql.host} -p {self.sql.port} -U {self.sql.user} -d {self.sqldb.name} -t {self.name}")) # sqltable else: raise Exception(f"Table dump of mode {self.sql.mode} is not supported yet") # sqltable
[docs] def __ror__(self, it): return self.sqldb.__ror__(it) # sqltable
def __len__(self): return self.query(f"select count(*) from {self.name}")[0][0] # sqltable def __iter__(self): return self.select("select * from valves") # sqltable @property # sqltable def scols(self): q = qD[self.sql.mode]; return ", ".join([f"{q}{c}{q}" for c in self.cols]) # string columns # sqltable
[docs] def lookup(self, **kwargs): # sqltable """Convenience function to lookup 1 instance with the specified value. Example:: user = users.lookup(firstname="Reimu") # multiple columns work too user = users.lookup(firstname="Reimu", lastname="Hakurei") """ # sqltable p1 = " AND ".join([f"{k} = {self.sql.star}" for k in kwargs.keys()]) # sqltable res = self.query(f"select {self.scols} from {self.name} where {p1} limit 1", *kwargs.values()) # sqltable return None if len(res) == 0 else sqlrow(self.sql, self, res[0]) # sqltable
@property # sqltable def idCol(self): # sqltable if self.sql.mode == "pg": # cause postgresql kinda don't respect column orders, so if I were to define "id" as the first column, it wouldn't work reliably. So this will just searches for a column called "id". If found, then uses that for indexing operations, else use the first column like my or lite # sqltable return "id" if len([x for x in self.cols if x == "id"]) > 0 else self.cols[0] # sqltable else: return self.cols[0] # sqltable @property # sqltable def idColPos(self): idCol = self.idCol; return [i for i,x in enumerate(self.cols) if x == idCol][0] if self.sql.mode == "pg" else 0 # sqltable _cache = k1.cache(settings.cred.sql.cache.size, settings.cred.sql.cache.timeout, name="sqltable.getitem", docs="Caches result of table[idx]"); rowCaches.add(_cache) # sqltable @_cache # sqltable def __getitem__(self, idx): # sqltable mode = self.sql.mode; idCol = self.idCol; idColPos = self.idColPos # sqltable q = qD[mode]; scols = ", ".join([f"{q}{c}{q}" for c in self.cols]) # sqltable if idx is None: return None # sqltable elif isinstance(idx, (int, str)): # sqltable res = self.query(f"select {scols} from {self.name} where {idCol} = {self.sql.star}", idx) # sqltable if len(res) == 0: return None # sqltable return sqlrow(self.sql, self, res[0]) # sqltable elif isinstance(idx, slice): # sqltable idxs = list(range(idx.stop if idx.stop is not None else (self.query(f"select max({idCol}) from {self.name}")[0][0]+1))[idx]) # sqltable if len(idxs) == 0: return [] # sqltable res = self.query(f"select {scols} from {self.name} where {idCol} in ({', '.join([str(x) for x in idxs])})") # sqltable d = {row[idColPos]:row for row in res}; sql = self.sql; return [sqlrow(sql, self, d[idx]) if d.get(idx, None) else None for idx in idxs] # sqltable else: # sqltable idxs = list(idx) # sqltable if all([isinstance(e, int) for e in idxs]): # does not support string because that might lead to sql injection. Can fix, but too lazy # sqltable if len(idxs) == 0: return [] # sqltable res = self.query(f"select {scols} from {self.name} where {idCol} in ({', '.join([str(e) for e in idx])})") # sqltable d = {row[idColPos]:row for row in res}; sql = self.sql; return [sqlrow(sql, self, d[idx]) if d.get(idx, None) else None for idx in idxs] # sqltable raise Exception("Only support table indexing of integers or strings or slices or list[int]") # sqltable def __setitem__(self, idx, value): # sqltable if not isinstance(value, dict): raise Exception("Only accepts setting elements with dicts") # sqltable if self[idx] is None: raise Exception(f"Can't set element with id {idx}, it doesn't seem to exist. Use table.insert(col1=value1, col2=value2) instead") # sqltable self._update(idx, **value) # sqltable def __iter__(self): return iter(self | cli.cat()) # sqltable def __delitem__(self, idx): # sqltable idCol = self.idCol; idColPos = self.idColPos # sqltable if isinstance(idx, (int, str)): # sqltable res = self[idx] # sqltable if res: self.query(f"delete from {self.name} where {idCol} = {self.sql.star}", res[idColPos]) # sqltable elif isinstance(idx, slice): # sqltable res = self[idx]; ids = ", ".join([row[idColPos] for row in res if row]) # sqltable if len(res) > 0: self.query(f"delete from {self.name} where {idCol} in ({ids})") # sqltable else: raise Exception("Only support table indexing of integers or strings or slices") # sqltable
[docs] class sqlrow: # sqlrow def __init__(self, sql, sqltable, row): # sqlrow row = [stripMemView(x) for x in row]; self._ab_sentinel = True; self._sql = sql; self._sqltable = sqltable # sqlrow self._row = row; _typeD = sqltable._col2Type(); self._data = {} # sqlrow if sql.mode == "lite": # sqlrow for k, v in zip(sqltable.cols, row): _type = _typeD.get(k, ""); self._data[k] = json.loads(v) if _type.endswith("[]") or _type == "json" else v # sqlrow else: self._data = {k:v for k,v in zip(sqltable.cols, row)} # sqlrow self._ab_sentinel = False # sqlrow # if sql.mode == "pg": self.__dict__.update({k.lower():v for k,v in zip(sqltable.cols, row)}) # add case insensitive cases # sqlrow def __getitem__(self, idx): return self._row[idx] # sqlrow def __getattr__(self, attr): # sqlrow if attr in self._data: return self._data[attr] # sqlrow if attr.lower() in self._data and self._sql.mode == "pg": return self._data[attr.lower()] # sqlrow return self.__dict__[attr] # sqlrow def __setattr__(self, attr, value): # sqlrow if attr == "_ab_sentinel": self.__dict__[attr] = value # sqlrow else: # sqlrow if self._ab_sentinel: self.__dict__[attr] = value; return # sqlrow attr = attr.lower() if self._sql.mode == "pg" else attr # sqlrow if attr not in self._sqltable.cols: self.__dict__[attr] = value # sqlrow else: # sqlrow self._sqltable._update(self._row[self._sqltable.idColPos], **{attr: value}); self.__dict__[attr] = value # sqlrow idx = {x:y for x,y in zip(self._sqltable.cols, range(len(self._row)))}[attr] # sqlrow row = list(self._row); row[idx] = value; self._row = row; self._data[attr] = value # sqlrow
[docs] def json(self): return self._data # sqlrow
def __len__(self): return len(self._row) # sqlrow def __repr__(self): # sqlrow ans = [] # sqlrow for k, elem in self._data.items(): # sqlrow if isinstance(elem, str): ans.append(f"{k}=({len(elem)} len) {json.dumps(elem[:100])}..." if len(elem) > 100 else f"{k}=({len(elem)} len) {json.dumps(elem)}") # sqlrow elif isinstance(elem, bytes): ans.append(f"{k}=({len(elem)} len) {elem[:100]}..." if len(elem) > 100 else f"{k}=({len(elem)} len) {elem}") # sqlrow else: ans.append(f"{k}={elem}") # sqlrow return "(" + f", ".join(ans) + ")" # sqlrow
settings.cli.atomic.baseAnd = (*settings.cli.atomic.baseAnd, sql, sqldb, sqltable, sqlrow) # sqlrow boto3 = k1.dep("boto3") # if below so that the tests would run since I have loaded the creds from my startup.py # sqlrow if "minio" not in settings.cred.__dict__: settings.cred.add("minio", k1lib.Settings().add("host", "https://localhost:9000", env="K1_MINIO_HOST").add("access_key", "", sensitive=True, env="K1_MINIO_ACCESS_KEY").add("secret_key", "", sensitive=True, env="K1_MINIO_SECRET_KEY"), "anything related to minio buckets, used by k1lib.cli.lsext.minio") # sqlrow
[docs] def minio(host=None, access_key=None, secret_key=None) -> "s3": # minio """Convenience function that constructs a :class:`s3` object but focused on minio. If the params are not specified, then it takes on the values in the settings "settings.cred.minio". Example:: s = minio() # returns s3 instance s | ls() # list all buckets, all other normal operations :param host: looks like "http://localhost:9000" """ # minio return s3(boto3.client('s3', # minio endpoint_url=host or settings.cred.minio.host, # minio aws_access_key_id=access_key or settings.cred.minio.access_key, # minio aws_secret_access_key=secret_key or settings.cred.minio.secret_key, # minio region_name='us-west-2')) # minio
[docs] class s3: # s3
[docs] def __init__(self, client): # s3 """Represents an S3 client. Example:: client = boto3.client("s3", ...) # put your credentials and details here db = s3(client) # creates an S3 manager db | ls() # lists all buckets accessible bucket = db | ls() | item() # grabs the first bucket, returns object of type s3bucket bucket = db["bucket-name"] # or you can instantiate the bucket directly bucket | ls() # lists all objects within this bucket bucket | ls() | grep("\\.so") # grabs all .so files from the bucket obj = bucket | ls() | item() # grabs the first object within this bucket, returns object of type s3obj obj = bucket["some_key"] # or, grab a specific object obj.key, obj.size, obj.lastModified # some fields directly accessible del bucket["some_key"] # deletes the object from the bucket obj = "abc\\ndef" | bucket.upload("somekey") # uploads a file by piping the contents in, returns s3obj obj = b"abc\\ndef"| bucket.upload() # same as above, but with an auto-generated key This mostly offers interoperability with ls() and cat(), so that you can write relatively intuitive code, but fundamentally provides no upsides :param client: boto3 client""" # s3 self.client = client; self._cachedLs = None # s3
def _ls(self): self._cachedLs = {x["Name"]:s3bucket(self.client, x["Name"]) for x in self.client.list_buckets()["Buckets"]}; return list(self._cachedLs.values()) # s3 def __len__(self): # s3 if self._cachedLs is None: self._ls() # s3 return len(self._cachedLs) # s3 def __getitem__(self, name): # s3 if self._cachedLs is None: self._ls() # s3 if name not in self._cachedLs: self._ls() # s3 if name not in self._cachedLs: raise Exception(f"Bucket '{name}' doesn't exist") # s3 return self._cachedLs[name] # s3 def __repr__(self): return f"<kaws.s3 client>" # s3
_s3bucketUploader_autoInc = k1lib.AutoIncrement(prefix=f"_key_{round(time.time()*1000)}_") # s3 class s3bucketUploader(cli.BaseCli): # s3bucketUploader def __init__(self, client, bucket:"s3bucket", key:str): # s3bucketUploader """Uploads the data piped in, uploads, then return a :class:`s3obj`""" # s3bucketUploader self.client = client; self.bucket = bucket; self.key = key # s3bucketUploader def __ror__(self, it): # s3bucketUploader if isinstance(it, (bytes, str)): self.client.put_object(Bucket=self.bucket.name, Key=self.key, Body=(it if isinstance(it, bytes) else it.encode())) # first path is more direct and doesn't have to save to a file first # s3bucketUploader else: tmpFile = it | cli.file(); self.client.upload_file(tmpFile, self.bucket.name, self.key); os.remove(tmpFile) # s3bucketUploader return self.bucket[self.key] # s3bucketUploader def __repr__(self): return f"<s3bucketUploader bucket.name='{self.bucket.name}' key='{self.key}'>" # s3bucketUploader
[docs] class s3bucket: # s3bucket
[docs] def __init__(self, client, name:str): # s3bucket """Represents an S3 bucket. Example:: client = ... db = s3(client) bucket = db["bucket-name-here"] See also: :class:`s3`""" # s3bucket self.client = client; self.name = name; self._cachedLs = None # s3bucket
def _ls(self): # s3bucket client = self.client; name = self.name; # s3bucket self._cachedLs = {data["Key"]:s3obj(client, name, data) for data in self.client.list_objects(Bucket=name).get("Contents", [])} # s3bucket return list(self._cachedLs.values()) # s3bucket def __len__(self): # s3bucket if self._cachedLs is None: self._ls() # s3bucket return len(self._cachedLs) # s3bucket
[docs] def upload(self, key:str=None): # s3bucket """Uploads some content to s3. Example:: bucket = ... obj = "abc\\ndef" | bucket.upload() # uploads some text obj = b"abc\\ndef" | bucket.upload() # uploads some bytes This works with whatever you can pipe into :class:`~k1lib.cli.output.file`. After uploading, you will receive a :class:`s3obj` object. :param key: if not specified, will auto generate a key""" # s3bucket if key is None: key = _s3bucketUploader_autoInc() # s3bucket return s3bucketUploader(self.client, self, key) # s3bucket
def __repr__(self): return f"<s3bucket name='{self.name}'>" # s3bucket def __getitem__(self, key:str): # s3bucket res = self.client.head_object(Bucket=self.name, Key=key) # s3bucket data = {"Key": key, "LastModified": res["LastModified"], "Size": int(res["ResponseMetadata"]["HTTPHeaders"]["content-length"]), "StorageClass": None} # s3bucket return s3obj(self.client, self.name, data) # s3bucket def __delitem__(self, key:str): self.client.delete_object(Bucket=self.name, Key=key) # s3bucket
[docs] class s3obj: # s3obj
[docs] def __init__(self, client, bucket:str, data): # s3obj """Represents an S3 object. Not intended to be instantiated directly. See also: :class:`s3`""" # s3obj self.client = client; self.bucket = bucket # s3obj self.key = data["Key"]; self.lastModified = data["LastModified"] | cli.toUnix() # s3obj self.size = data["Size"]; self.storageClass = data["StorageClass"] # s3obj
def __repr__(self): return f"<s3obj bucket='{self.bucket}' key='{self.key}' size='{k1lib.fmt.size(self.size)}' lastModified='{self.lastModified | cli.toIso()}'>" # s3obj def _cat(self, kwargs): # s3obj sB = kwargs["sB"]; eB = kwargs["eB"] # s3obj if eB < 0: eB = self.size # s3obj res = self.client.get_object(Bucket=self.bucket, Key=self.key, Range=f'bytes={sB}-{eB-1}')["Body"].read() # s3obj if kwargs["text"]: return res.decode().split("\n") # s3obj if kwargs["chunks"]: return res | cli.batched(settings.cli.cat.chunkSize, True) # s3obj return res # s3obj
redis = k1lib.dep("redis") # s3obj _redisAutoInc = k1lib.AutoIncrement(prefix=f"_redis_msg_{round(time.time())}_") # s3obj settings.cred.add("redis", k1lib.Settings() # s3obj .add("host", "localhost", "location of the redis server", env="K1_REDIS_HOST") # s3obj .add("port", 6379, "port the redis server use", env=("K1_REDIS_PORT", int)) # s3obj .add("expires", 120, "seconds before the message deletes itself. Can be float('inf'), or 'inf' for the env variable", env=("K1_REDIS_EXPIRES", lambda x: float(x.lower()))), # s3obj "anything related to redis, used by k1lib.cli.lsext.Redis") # s3obj
[docs] class Redis: # Redis
[docs] def __init__(self, host=None, port=None, **kwargs): # Redis """Connects to Redis server. Example:: r = Redis("localhost", 6379) r["abc"] = {"some": "json", "number": 4} # sets value r["abc"] # reads value, returns python object key = r("some message") # sets value with auto generated key, returns that key r[key] # reads that value, returns "some message" key = r("some message", 60) # sets value with auto generated key, with expires amount, returns that key You can actually leave the constructors empty, as it will automatically pull the data from "settings.cred.redis.host", which pulls from the env variable "K1_REDIS_HOST". So it can be as short as this:: r = Redis() r["abc"] = {"some": "json", "number": 4} # sets value Internally, all objects are pickled and unpickled, so you can transport pretty much any Python object. Also, by default, each message will expire after 60s, You can adjust this via "settings.cred.redis.expires" setting. This is really only meant as a quick way for different servers to communicate with each other. If your use case is different, don't use this. :param host: host name of the redis server :param port: port the redis server uses :param kwargs: extra kwargs sent to :class:`redis.Redis`""" # Redis self.host = host or settings.cred.redis.host; self.port = port or settings.cred.redis.port # Redis self.client = redis.Redis(host=self.host, port=self.port, **kwargs) # Redis
def __getitem__(self, i): res = self.client.get(i); return None if res is None else dill.loads(res) # Redis def __setitem__(self, i, v, ex=None): ex = ex or settings.cred.redis.expires; self.client.set(i, dill.dumps(v), **({"ex":ex} if ex < float("inf") else {})) # Redis def __call__(self, v, ex=None) -> str: i = _redisAutoInc(); self.__setitem__(i, v, ex); return i # Redis def __repr__(self): return f"<Redis host='{self.host}:{self.port}'>" # Redis