Source code for k1lib.cli.lsext

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import k1lib as k1, os, json, k1lib, time, dill
import k1lib.cli as cli
from collections import deque
from functools import lru_cache
from contextlib import contextmanager
__all__ = ["sql", "sqldb", "sqltable", "sqlrow", "minio", "s3", "s3bucket", "s3obj", "Redis"]
settings = k1lib.settings
@contextmanager
def mysqlCnf(user, password, host, port):                                        # mysqlCnf
    fn = f"""[client]\nuser = "{user}"\npassword = "{password or ''}"\nhost = "{host}"\nport = "{port}" """ | cli.file() # mysqlCnf
    try: yield fn                                                                # mysqlCnf
    finally: os.remove(fn)                                                       # mysqlCnf
settings.cred.add("sql", k1lib.Settings()                                        # mysqlCnf
    .add("mode", "my", env="K1_SQL_MODE")                                        # mysqlCnf
    .add("host", "127.0.0.1", "host name of db server, or db file name if mode='lite'. Warning: mysql's mysqldump won't resolve domain names, so it's best to pass in ip addresses", env="K1_SQL_HOST") # mysqlCnf
    .add("port", 3306, env="K1_SQL_PORT")                                        # mysqlCnf
    .add("user", "admin", sensitive=True, env="K1_SQL_USER")                     # mysqlCnf
    .add("password", "admin", sensitive=True, env="K1_SQL_PASSWORD"), "anything related to sql, used by k1lib.cli.lsext.sql. See docs for that class for more details") # mysqlCnf
mysqlConn = k1.dep("mysql.connector", "mysql-connector-python", "https://pypi.org/project/mysql-connector-python/") # mysqlCnf
pgConn = k1.dep("psycopg2", "psycopg2-binary", "https://pypi.org/project/psycopg2/") # mysqlCnf
sqlite3 = k1.dep("sqlite3")                                                      # mysqlCnf
qD = {"my": "`", "pg": "", "lite": ""} # quote dict                              # mysqlCnf
default = object()                                                               # mysqlCnf
[docs]class sql: # sql
[docs] def __init__(self, host=default, port=default, user=default, password=default, mode=default): # sql """Creates a connection to a SQL database. Example:: s = sql("127.0.0.1") # creates a new sql object. Apparently, mysql and mysqldump on the command line bugs out if you put "localhost". If you put localhost here, it should work fine, but `sql(...) | toBytes()` and other functions that use external programs can deny you access s.refresh() # refreshes connection any time you encounter strange bugs s | ls() # returns List[sqldb], lists out all databases s | toBytes() # returns Iterator[str], dumps every databases. Yes, it's "toBytes", but it has the feeling of serializing whatever the input is, so it's at least intuitive in that way "dump.sql" | s # restores the database using the dump file cat("dump.sql") | s # restores the database using the dump file db1 = s["db1"] # returns sqldb, gets database named "db1" db1 | ls() # returns List[sqltable], list out all tables within this database db1 | toBytes() # returns Iterator[str], dumps the database users = db1["user"] # gets table named "user", short and simple db1.query("select * from users") # queries the database using your custom query db1.query("select * from users where user_id=%s", 3) # queries with prepared statement users.info() # prints out the first 10 rows of the table and the table schema users.cols # returns table's columns as List[str] len(users) # returns number of rows users.query(...) # can also do a custom query, just like with databases users[1] # grabs user with id 1. Returns None if user doesn't exist users[1:10] # grabs users with id from 1 (inclusive) to 10 (exclusive) users[1].firstname = "Reimu" # sets the first name of user with id 1 to "Reimu" users[1] = {"firstname": "Reimu", "lastname": "Hakurei"} # sets first name and last name of user with id 1 users.insert(firstname="Yuyuko", lastname="Saigyouji") # inserts a new row users.insertBulk(firstname=["Yuyuko", "Marisa"], lastname=["Saigyouji", "Kirisame"]) # inserts multiple rows at once users | toBytes() # dumps this specific table, returns Iterator[str] users | cat() | display() # reads entire table, gets first 10 rows and displays it out users | (cat() | head(20)) | display() # reads first 20 rows only, then displays the first 10 rows. Query sent is "select * from user limit 20" users | (cat() | filt("x == 4", 3) | head(20)) | display() # grabs first 20 rows that has the 4th column equal to 4, then displays the first 10 rows. Query sent is "select user_id, address, balance, age from user where age = 4", assuming the table only has those columns Philosophy for these methods is that they should be intuitive to look at and interact with, not for performance. If performance is needed, just write raw queries or don't use a database and use applyCl() instead. If any of the params here are not specified, they will take on the values from "settings.cred.sql", which can be initialized from environment variables. Execute and display/print "settings" in a new cell to see all of them. :param host: host name, ip address, or file name (in case of sqlite) :param port: port at the host :param uesr: database user name. If not specified then fallback to environment variable ``SQL_USER``, then ``USER`` :param password: database password. If not specified then assume database doesn't require one :param mode: currently supports 3 values: "my" (MySQL), "pg" (PostgreSQL) and "lite" (SQLite)""" # sql host = settings.cred.sql.host if host is default else host; port = settings.cred.sql.port if port is default else port; user = settings.cred.sql.user if user is default else user # sql password = settings.cred.sql.password if password is default else password; mode = settings.cred.sql.mode if mode is default else mode # sql if mode not in ("my", "pg", "lite"): raise Exception(f"Supports only 'my' (MySQL), 'pg' (PostgreSQL) and 'lite' (SQLite) for now, don't support {mode}") # sql if mode == "lite": host = os.path.expanduser(host) # sql self.host = host; self.port = port; self.user = user or os.environ.get("SQL_USER") or os.environ.get("USER") # sql self.password = password or os.environ.get("SQL_PASSWORD"); self.db = None; self.mode = mode; self.conn = None; self.refresh() # sql self.star = "%s" if mode == "my" or mode == "pg" else "?" # sql
[docs] def refresh(self): # sql """Sometimes, the connection errors out for whatever reason, so this method is to reestablish the connection. You don't need to worry about this though, as on every query, if it detects that the connection has dropped, it will automatically try to refresh""" # sql try: self.conn.close(); self.conn = None # sql except: pass # sql kwargs = dict(host=self.host, port=self.port, user=self.user, password=self.password, database=self.db) # sql if self.mode == "my": self.conn = mysqlConn.connect(**kwargs, charset='utf8mb4', collation='utf8mb4_general_ci') # sql elif self.mode == "pg": self.conn = pgConn.connect(**kwargs) # sql elif self.mode == "lite": self.conn = sqlite3.connect(self.host) # sql
def _changeDb(self, db): # sql if self.db != db: # sql if self.mode == "my": self.query(f"use `{db}`") # sql elif self.mode == "pg": self.db = db; self.refresh() # sql elif self.mode == "lite": pass # sql self.db = db # sql
[docs] def cursor(self): # sql """Creates a new cursor, auto-refreshes if necessary""" # sql if self.mode == "my": # sql try: cur = self.conn.cursor() # sql except mysqlConn.errors.OperationalError: self.refresh(); cur = self.conn.cursor() # sql elif self.mode == "pg" or self.mode == "lite": # sql cur = self.conn.cursor() # sql self._lastCur = cur; return cur # sql
def _query(self, query, *args): # sql cur = self.cursor(); cur.execute(query, args) # sql try: ans = cur.fetchall() # sql except: ans = None # sql return ans, cur # sql
[docs] def query(self, query, *args): # sql """Executes a query. Returns the resulting table. Example:: s.query("insert into users (name, age) values (%s, %s)", "Reimu", 25) """ # sql ans, cur = self._query(query, *args); cur.close(); self.conn.commit(); return ans # sql
[docs] def queryDesc(self, query, *args): # sql """Executes a query. Returns the resulting table and the description of the table (with column names and whatnot)""" # sql ans, cur = self._query(query, *args); desc = cur.description; cur.close(); self.conn.commit(); return ans, desc # sql
[docs] def queryMany(self, query, *args): # sql """Executes multiple queries at the same time. Example:: s.queryMany("insert into users (name, age) values (%s, %s)", [("Reimu", 25), ("Marisa", 26)]) """ # sql cur = self.cursor(); cur.executemany(query, *args) # sql try: ans = cur.fetchall() # sql except: ans = None # sql cur.close(); self.conn.commit(); return ans # sql
def _ls(self): # sql if self.mode == "my": return [sqldb(self, e[0]) for e in self.query("show databases")] # sql elif self.mode == "pg": return [sqldb(self, e[0]) for e in self.query("select datname from pg_database where datistemplate=false")] # sql elif self.mode == "lite": return [sqldb(self, "default")] # sql def __repr__(self): return f"<sql mode={self.mode} host={self._host}>" # sql @property # sql def _host(self) -> str: # sql if self.mode == "lite": return self.host.split(os.sep)[-1] # sql else: return f"{self.host}:{self.port}" # sql def _cnfCtx(self): return mysqlCnf(self.user, self.password, self.host, self.port) # sql def _toBytes(self): # sql if self.mode == "my": # sql with self._cnfCtx() as fn: yield from None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob --all-databases") # sql else: raise Exception(f"All databases dump of mode {self.mode} is not supported yet") # sql
[docs] def __ror__(self, it): # restoring a backup # sql if self.mode == "my": # sql def restore(fn): # sql with self._cnfCtx() as cnfFn: None | cli.cmd(f"mysql --defaults-file={cnfFn} < {fn}") | cli.ignore() # sql if isinstance(it, str): restore(it) # sql else: fn = it | cli.file(); restore(fn); os.remove(fn) # sql else: raise Exception(f"Restoring database from .sql file of mode {self.mode} is not supported yet") # sql
def __len__(self): return len(self._ls()) # sql def __getitem__(self, idx): # sql if isinstance(idx, str): # sql lsres = [x for x in self._ls() if x.name == idx] # sql if len(lsres) == 1: return lsres[0] # sql else: raise Exception(f"Database '{idx}' does not exist, can't retrieve it") # sql return self._ls()[idx] # sql def __iter__(self): return iter(self._ls()) # sql def __delitem__(self, idx): # sql if isinstance(idx, str): # sql lsres = [x for x in self._ls() if x.name == idx] # sql if len(lsres) == 1: # sql if self.mode == "my" or self.mode == "pg": return self.query(f"drop database {idx}") # sql else: raise Exception("Currently only support dropping databases in mysql and postgresql") # sql else: raise Exception(f"Database '{idx}' does not exist, can't delete it") # sql else: raise Exception("Only supports deleting with table names. Use this like `del db['some_table']`") # sql
[docs]class sqldb: # sqldb
[docs] def __init__(self, sql:sql, name:str): # sqldb """A sql database representation. Not expected to be instatiated by you. See also: :class:`sql`""" # sqldb self.sql = sql; self.name = name # sqldb
[docs] def query(self, query, *args): self.sql._changeDb(self.name); return self.sql.query(query, *args) # sqldb
[docs] def queryDesc(self, query, *args): self.sql._changeDb(self.name); return self.sql.queryDesc(query, *args) # sqldb
[docs] def queryMany(self, query, *args): self.sql._changeDb(self.name); return self.sql.queryMany(query, *args) # sqldb
def _ls(self): # sqldb if self.sql.mode == "my": return [sqltable(self.sql, self, e[0]) for e in self.query(f"show tables")] # sqldb if self.sql.mode == "pg": return [sqltable(self.sql, self, e[0]) for e in self.query(f"select table_name from information_schema.tables")] # sqldb if self.sql.mode == "lite": return [sqltable(self.sql, self, e[0]) for e in self.query("select name from sqlite_master where type='table'")] # sqldb def __repr__(self): return f"<sqldb host={self.sql._host} db={self.name}>" # sqldb def _toBytes(self): # sqldb if self.sql.mode == "my": # sqldb with self.sql._cnfCtx() as fn: yield from None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob --databases {self.name}") # sqldb else: raise Exception(f"Database dump of mode {self.sql.mode} is not supported yet") # sqldb
[docs] def __ror__(self, it): q = qD[self.sql.mode]; return self.sql.__ror__([f"USE {q}{self.name}{q};", *it]) # sqldb
def __len__(self): return len(self._ls()) # sqldb def __getitem__(self, idx): # sqldb if isinstance(idx, str): # sqldb lsres = [x for x in self._ls() if x.name == idx] # sqldb if len(lsres) == 1: return lsres[0] # sqldb else: raise Exception(f"Table '{idx}' does not exist, can't retrieve it") # sqldb return self._ls()[idx] # sqldb def __iter__(self): return iter(self._ls()) # sqldb def __delitem__(self, idx): # sqldb if isinstance(idx, str): # sqldb lsres = [x for x in self._ls() if x.name == idx] # sqldb if len(lsres) == 1: return self.query(f"drop table {idx}") # sqldb else: raise Exception(f"Table '{idx}' does not exist, can't delete it") # sqldb else: raise Exception("Only supports deleting with table names. Use this like `del db['some_table']`") # sqldb
[docs]class sqltable: # sqltable
[docs] def __init__(self, sql, sqldb, name:str): # sqltable """A sql table representation. Not expected to be instantiated by you. See also: :class:`sql`""" # sqltable self.sql = sql; self.sqldb = sqldb; self.name = name; self._cols = None # sqltable
def _cat(self, ser): # sqltable cols = self.cols; _2 = [] # clis that can't be optimized, stashed away to be merged with ser later on # sqltable q = qD[self.sql.mode]; clis = deque(ser.clis) # sqltable o1 = None # cut() opt # sqltable o2 = None # head() opt # sqltable o3 = [] # filt() opt # sqltable while len(clis) > 0: # sqltable c = clis.popleft() # sqltable if isinstance(c, cli.filt): _2.append(c); break # TODO: add optimizations for filt # sqltable elif o2 is None and isinstance(c, cli.head): # sqltable if round(c.n) != c.n or c.n < 0 or c.inverted or c.n == None: _2.append(c); break # sqltable else: o2 = f"limit {c.n}"; continue # sqltable elif o1 is None and isinstance(c, cli.cut): # sqltable if isinstance(c.columns, slice): _2.append(c); o1 = 0; continue # sqltable else: # sqltable o1 = ", ".join([f"{q}{c}{q}" for c in cols | cli.rows(*c.columns)]) # sqltable if len(c.columns) == 1: _2.append(cli.item().all() | cli.aS(list)) # sqltable else: _2.append(c); break # sqltable o1 = o1 or ", ".join([f"{q}{c}{q}" for c in cols]) # sqltable query = f"select {o1} from {q}{self.name}{q} {o2 or ''}"#; print(f"query: {query}"); return [] # sqltable sql = self.sql; return [sqlrow(sql, self, row) for row in self.sqldb.query(query) | cli.serial(*_2, *clis)] # sqltable @property # sqltable def cols(self): # sqltable """Get column names""" # sqltable if not self._cols: self._cols = self._describe()[1:] | cli.cut({"my": 0, "pg": 0, "lite": 1}[self.sql.mode]) | cli.deref() # sqltable return self._cols # sqltable @lru_cache # sqltable def _describe(self): # sqltable if self.sql.mode == "my": return self.sqldb.query(f"describe `{self.name}`") | cli.insert(["Field", "Type", "Null", "Key", "Default", "Extra"]) | cli.deref() # sqltable if self.sql.mode == "pg": return self.sqldb.query(f"select column_name, data_type, is_nullable, column_default, ordinal_position from information_schema.columns where table_name='{self.name}'") | cli.insert(["column_name", "data_type", "is_nullable", "column_default", "ordinal_position"]) | cli.deref() # sqltable if self.sql.mode == "lite": return self.sqldb.query(f"pragma table_info([{self.name}])") | cli.insert(["cid", "name", "type", "notnull", "dflt_value", "pk"]) | cli.deref() # sqltable
[docs] def insert(self, **kwargs): # sqltable """Inserts a row. Example:: table = ... table.insert(firstname="Yuyuko", lastname="Saigyouji") """ # sqltable keys = ", ".join([f"`{x}`" for x in kwargs.keys()]); values = ", ".join([self.sql.star]*len(kwargs)) # sqltable self.query(f"insert into {self.name} ({keys}) values ({values})", *kwargs.values()) # sqltable if self.sql.mode == "my": return self[self.query("select last_insert_id()")[0][0]] # sqltable if self.sql.mode == "lite": return self[self.sql._lastCur.lastrowid] # sqltable
[docs] def insertBulk(self, **kwargs): # sqltable """Inserts a row. Example:: table = ... table.insertBulk(firstname=["Yuyuko", "Marisa"], lastname=["Saigyouji", "Kirisame"]) """ # sqltable keys = ", ".join([f"`{x}`" for x in kwargs.keys()]); values = ", ".join([self.sql.star]*len(kwargs)) # sqltable self.queryMany(f"insert into {self.name} ({keys}) values ({values})", list(kwargs.values() | cli.T())) # sqltable if self.sql.mode == "my": return self[self.query("select last_insert_id()")[0][0]] # sqltable if self.sql.mode == "lite": return self[self.sql._lastCur.lastrowid] # sqltable
def _update(self, idx, **kwargs): # sqltable """Updates a row. Example:: rable = ... table.update(3, firstname="Youmu", lastname="Konpaku") """ # sqltable p1 = ", ".join([f"`{k}` = {self.sql.star}" for k in kwargs.keys()]) # sqltable self.query(f"update {self.name} set {p1} where {self.cols[0]} = {idx}", *kwargs.values()) # sqltable
[docs] def info(self, out=False): # sqltable """Preview table. Example:: table = ... table.info() :param out: if True, returns a list of lines instead of printing them out""" # sqltable def gen(): # sqltable desc = self._describe() | cli.deref(); cols = self.cols; mode = self.sql.mode; q = qD[mode]; s = ", ".join([f"{q}{e}{q}" for e in cols]) # sqltable if mode == "my": # sqltable status = self.sqldb.query(f"show table status like '{self.name}'")[0] # sqltable status = f"engine:{status[1]}, #rows(approx):{status[4]}, len(row):{status[5]}, updated:{status[12]}UTC, created:{status[11]}UTC" # sqltable elif mode == "lite": # sqltable nrows = self.query(f"select max(rowid) from {q}{self.name}{q}")[0][0] # sqltable ncols = len(self.query(f"select * from {q}{self.name}{q} limit 1")[0]) # sqltable status = f"#rows(approx):{nrows}, len(row):{ncols}" # sqltable else: nrows = self.sqldb.query(f"select count(*) from {q}{self.name}{q}")[0][0]; status = f"{nrows} rows total" # sqltable print(f"Table `{self.name}` ({status})\n"); self.sqldb.query(f"select {s} from {q}{self.name}{q} limit 9") | (cli.aS(repr) | cli.head(50)).all(2) | cli.insert(cols) | cli.display() # sqltable # print tails # sqltable if mode == "lite": print("...\n..."); self.sqldb.query(f"select {s} from {q}{self.name}{q} order by rowid desc limit 9") | cli.reverse() | (cli.aS(repr) | cli.head(50)).all(2) | cli.insert(cols) | cli.display() # sqltable if mode == "my" or mode == "pg": # sqltable try: # sqltable data = self.sqldb.query(f"select {s} from {q}{self.name}{q} order by {cols[0]} desc limit 9") | cli.reverse() | (cli.aS(repr) | cli.head(50)).all(2) | cli.insert(cols) | cli.deref() # sqltable print("...\n..."); data | cli.display() # sqltable except: pass # sqltable print(""); desc | cli.display(None) # sqltable if out: # sqltable with k1.captureStdout() as out: gen() # sqltable return out() # sqltable else: gen() # sqltable
[docs] def query(self, query, *args): return self.sqldb.query(query, *args) # sqltable
[docs] def queryDesc(self, query, *args): return self.sqldb.queryDesc(query, *args) # sqltable
[docs] def queryMany(self, query, *args): return self.sqldb.queryMany(query, *args) # sqltable
[docs] def select(self, query, *args): # sqltable """Pretty much identical to :meth:`query`, but this postprocess the results a little bit, and package them into :class:`sqlrow` objects""" # sqltable res = self.query(query, *args); sql = self.sql; return [sqlrow(sql, self, e) for e in res] # sqltable
def __repr__(self): return f"<sqltable host={self.sql._host} db={self.sqldb.name} table={self.name}>" # sqltable def _toBytes(self): # sqltable if self.sql.mode == "my": # sqltable with self.sql._cnfCtx() as fn: yield from None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob {self.sqldb.name} {self.name}") # sqltable else: raise Exception(f"Table dump of mode {self.sql.mode} is not supported yet") # sqltable
[docs] def __ror__(self, it): return self.sqldb.__ror__(it) # sqltable
def __len__(self): return self.query(f"select count(*) from {self.name}")[0][0] # sqltable
[docs] def lookup(self, **kwargs): # sqltable """Convenience function to lookup 1 instance with the specified value. Example:: user = users.lookup(firstname="Reimu") # multiple columns work too user = users.lookup(firstname="Reimu", lastname="Hakurei") """ # sqltable p1 = " AND ".join([f"{k} = {self.sql.star}" for k in kwargs.keys()]) # sqltable res = self.query(f"select * from {self.name} where {p1} limit 1", *kwargs.values()) # sqltable return None if len(res) == 0 else sqlrow(self.sql, self, res[0]) # sqltable
def __getitem__(self, idx): # sqltable idName = self.cols[0] # sqltable if idx is None: return None # sqltable elif isinstance(idx, (int, str)): # sqltable res = self.query(f"select * from {self.name} where {idName} = {self.sql.star}", idx) # sqltable if len(res) == 0: return None # sqltable return sqlrow(self.sql, self, res[0]) # sqltable elif isinstance(idx, slice): # sqltable idxs = list(range(idx.stop if idx.stop is not None else (self.query(f"select max({idName}) from {self.name}")[0][0]+1))[idx]) # sqltable res = self.query(f"select * from {self.name} where {idName} in ({', '.join([str(x) for x in idxs])})") # sqltable d = {row[0]:row for row in res}; sql = self.sql; return [sqlrow(sql, self, d[idx]) if d.get(idx, None) else None for idx in idxs] # sqltable else: # sqltable idxs = list(idx) # sqltable if all([isinstance(e, int) for e in idxs]): # does not support string because that might lead to sql injection. Can fix, but too lazy # sqltable res = self.query(f"select * from {self.name} where {idName} in ({', '.join([str(e) for e in idx])})") # sqltable d = {row[0]:row for row in res}; sql = self.sql; return [sqlrow(sql, self, d[idx]) if d.get(idx, None) else None for idx in idxs] # sqltable raise Exception("Only support table indexing of integers or strings or slices or list[int]") # sqltable def __setitem__(self, idx, value): # sqltable if not isinstance(value, dict): raise Exception("Only accepts setting elements with dicts") # sqltable if self[idx] is None: raise Exception(f"Can't set element with id {idx}, it doesn't seem to exist. Use table.insert(col1=value1, col2=value2) instead") # sqltable self._update(idx, **value) # sqltable def __iter__(self): return iter(self | cli.cat()) # sqltable def __delitem__(self, idx): # sqltable idName = self.cols[0] # sqltable if isinstance(idx, (int, str)): # sqltable res = self[idx] # sqltable if res: self.query(f"delete from {self.name} where {idName} = {self.sql.star}", res[0]) # sqltable elif isinstance(idx, slice): # sqltable res = self[idx]; ids = ", ".join([row[0] for row in res if row]) # sqltable self.query(f"delete from {self.name} where {idName} in {ids}") # sqltable else: raise Exception("Only support table indexing of integers or strings or slices") # sqltable
[docs]class sqlrow: # sqlrow def __init__(self, sql, sqltable, row): # sqlrow self._ab_sentinel = True; self._sql = sql; self._sqltable = sqltable; self._row = row # sqlrow self.__dict__.update({k:v for k,v in zip(sqltable.cols, row)}); self._ab_sentinel = False # sqlrow def __getitem__(self, idx): return self._row[idx] # sqlrow def __setattr__(self, attr, value): # sqlrow if attr == "_ab_sentinel": self.__dict__[attr] = value # sqlrow else: # sqlrow if self._ab_sentinel or attr not in self._sqltable.cols: self.__dict__[attr] = value # sqlrow else: # sqlrow self._sqltable._update(self._row[0], **{attr: value}); self.__dict__[attr] = value # sqlrow idx = {x:y for x,y in zip(self._sqltable.cols, range(len(self._row)))}[attr] # sqlrow row = list(self._row); row[idx] = value; self._row = row # sqlrow def __len__(self): return len(self._row) # sqlrow def __repr__(self): # sqlrow ans = [] # sqlrow for elem in self._row: # sqlrow if isinstance(elem, str): ans.append(f"({len(elem)} len) {json.dumps(elem[:100])}..." if len(elem) > 100 else f"({len(elem)} len) {json.dumps(elem)}") # sqlrow elif isinstance(elem, bytes): ans.append(f"({len(elem)} len) {elem[:100]}..." if len(elem) > 100 else f"({len(elem)} len) {elem}") # sqlrow else: ans.append(f"{elem}") # sqlrow return "(" + f", ".join(ans) + ")" # sqlrow
boto3 = k1.dep("boto3") # if below so that the tests would run since I have loaded the creds from my startup.py # sqlrow if "minio" not in settings.cred.__dict__: settings.cred.add("minio", k1lib.Settings().add("host", "https://localhost:9000", env="K1_MINIO_HOST").add("access_key", "", sensitive=True, env="K1_MINIO_ACCESS_KEY").add("secret_key", "", sensitive=True, env="K1_MINIO_SECRET_KEY"), "anything related to minio buckets, used by k1lib.cli.lsext.minio") # sqlrow
[docs]def minio(host=None, access_key=None, secret_key=None) -> "s3": # minio """Convenience function that constructs a :class:`s3` object but focused on minio. If the params are not specified, then it takes on the values in the settings "settings.cred.minio". Example:: s = minio() # returns s3 instance s | ls() # list all buckets, all other normal operations :param host: looks like "http://localhost:9000" """ # minio return s3(boto3.client('s3', # minio endpoint_url=host or settings.cred.minio.host, # minio aws_access_key_id=access_key or settings.cred.minio.access_key, # minio aws_secret_access_key=secret_key or settings.cred.minio.secret_key, # minio region_name='us-west-2')) # minio
[docs]class s3: # s3
[docs] def __init__(self, client): # s3 """Represents an S3 client. Example:: client = boto3.client("s3", ...) # put your credentials and details here db = s3(client) # creates an S3 manager db | ls() # lists all buckets accessible bucket = db | ls() | item() # grabs the first bucket, returns object of type s3bucket bucket = db["bucket-name"] # or you can instantiate the bucket directly bucket | ls() # lists all objects within this bucket bucket | ls() | grep("\\.so") # grabs all .so files from the bucket obj = bucket | ls() | item() # grabs the first object within this bucket, returns object of type s3obj obj = bucket["some_key"] # or, grab a specific object obj.key, obj.size, obj.lastModified # some fields directly accessible obj = "abc\ndef" | bucket.upload("somekey") # uploads a file by piping the contents in, returns s3obj obj = b"abc\ndef"| bucket.upload() # same as above, but with an auto-generated key This mostly offers interoperability with ls() and cat(), so that you can write relatively intuitive code, but fundamentally provides no upsides :param client: boto3 client""" # s3 self.client = client; self._cachedLs = None # s3
def _ls(self): self._cachedLs = {x["Name"]:s3bucket(self.client, x["Name"]) for x in self.client.list_buckets()["Buckets"]}; return list(self._cachedLs.values()) # s3 def __len__(self): # s3 if self._cachedLs is None: self._ls() # s3 return len(self._cachedLs) # s3 def __getitem__(self, name): # s3 if self._cachedLs is None: self._ls() # s3 if name not in self._cachedLs: self._ls() # s3 if name not in self._cachedLs: raise Exception(f"Bucket '{name}' doesn't exist") # s3 return self._cachedLs[name] # s3 def __repr__(self): return f"<kaws.s3 client>" # s3
_s3bucketUploader_autoInc = k1lib.AutoIncrement(prefix=f"_key_{round(time.time()*1000)}_") # s3 class s3bucketUploader(cli.BaseCli): # s3bucketUploader def __init__(self, client, bucket:"s3bucket", key:str): # s3bucketUploader """Uploads the data piped in, uploads, then return a :class:`s3obj`""" # s3bucketUploader self.client = client; self.bucket = bucket; self.key = key # s3bucketUploader def __ror__(self, it): # s3bucketUploader if isinstance(it, (bytes, str)): self.client.put_object(Bucket=self.bucket.name, Key=self.key, Body=(it if isinstance(it, bytes) else it.encode())) # first path is more direct and doesn't have to save to a file first # s3bucketUploader else: tmpFile = it | cli.file(); self.client.upload_file(tmpFile, self.bucket.name, self.key); os.remove(tmpFile) # s3bucketUploader return self.bucket[self.key] # s3bucketUploader def __repr__(self): return f"<s3bucketUploader bucket.name='{self.bucket.name}' key='{self.key}'>" # s3bucketUploader
[docs]class s3bucket: # s3bucket
[docs] def __init__(self, client, name:str): # s3bucket """Represents an S3 bucket. Example:: client = ... db = s3(client) bucket = db["bucket-name-here"] See also: :class:`s3`""" # s3bucket self.client = client; self.name = name; self._cachedLs = None # s3bucket
def _ls(self): # s3bucket client = self.client; name = self.name; # s3bucket self._cachedLs = {data["Key"]:s3obj(client, name, data) for data in self.client.list_objects(Bucket=name).get("Contents", [])} # s3bucket return list(self._cachedLs.values()) # s3bucket def __len__(self): # s3bucket if self._cachedLs is None: self._ls() # s3bucket return len(self._cachedLs) # s3bucket
[docs] def upload(self, key:str=None): # s3bucket """Uploads some content to s3. Example:: bucket = ... obj = "abc\ndef" | bucket.upload() # uploads some text obj = b"abc\ndef" | bucket.upload() # uploads some bytes This works with whatever you can pipe into :class:`~k1lib.cli.output.file`. After uploading, you will receive a :class:`s3obj` object. :param key: if not specified, will auto generate a key""" # s3bucket if key is None: key = _s3bucketUploader_autoInc() # s3bucket return s3bucketUploader(self.client, self, key) # s3bucket
def __repr__(self): return f"<s3bucket name='{self.name}'>" # s3bucket def __getitem__(self, key:str): # s3bucket res = self.client.head_object(Bucket=self.name, Key=key) # s3bucket data = {"Key": key, "LastModified": res["LastModified"], "Size": int(res["ResponseMetadata"]["HTTPHeaders"]["content-length"]), "StorageClass": None} # s3bucket return s3obj(self.client, self.name, data) # s3bucket
[docs]class s3obj: # s3obj
[docs] def __init__(self, client, bucket:str, data): # s3obj """Represents an S3 object. Not intended to be instantiated directly. See also: :class:`s3`""" # s3obj self.client = client; self.bucket = bucket # s3obj self.key = data["Key"]; self.lastModified = data["LastModified"] | cli.toUnix() # s3obj self.size = data["Size"]; self.storageClass = data["StorageClass"] # s3obj
def __repr__(self): return f"<s3obj bucket='{self.bucket}' key='{self.key}' size='{k1lib.fmt.size(self.size)}' lastModified='{self.lastModified | cli.toIso()}'>" # s3obj def _cat(self, kwargs): # s3obj sB = kwargs["sB"]; eB = kwargs["eB"] # s3obj if eB < 0: eB = self.size # s3obj res = self.client.get_object(Bucket=self.bucket, Key=self.key, Range=f'bytes={sB}-{eB-1}')["Body"].read() # s3obj if kwargs["text"]: return res.decode().split("\n") # s3obj if kwargs["chunks"]: return res | cli.batched(settings.cli.cat.chunkSize, True) # s3obj return res # s3obj
redis = k1lib.dep("redis") # s3obj _redisAutoInc = k1lib.AutoIncrement(prefix=f"_redis_msg_{round(time.time())}_") # s3obj settings.cred.add("redis", k1lib.Settings() # s3obj .add("host", "localhost", "location of the redis server", env="K1_REDIS_HOST") # s3obj .add("port", 6379, "port the redis server use", env=("K1_REDIS_PORT", int)) # s3obj .add("expires", 60, "seconds before the message deletes itself. Can be float('inf'), or 'inf' for the env variable", env=("K1_REDIS_EXPIRES", lambda x: float(x.lower()))), # s3obj "anything related to redis, used by k1lib.cli.lsext.Redis") # s3obj
[docs]class Redis: # Redis
[docs] def __init__(self, host=None, port=None, **kwargs): # Redis """Connects to Redis server. Example:: r = Redis("localhost", 6379) r["abc"] = {"some": "json", "number": 4} # sets value r["abc"] # reads value, returns python object key = r("some message") # sets value with auto generated key, returns that key r[key] # reads that value, returns "some message" r(key, 60) # sets value with auto generated key, with expires amount, returns that key You can actually leave the constructors empty, as it will automatically pull the data from "settings.cred.redis.host", which pulls from the env variable "K1_REDIS_HOST". So it can be as short as this:: r = Redis() r["abc"] = {"some": "json", "number": 4} # sets value Internally, all objects are pickled and unpickled, so you can transport pretty much any Python object. Also, by default, each message will expire after 60s, You can adjust this via "settings.cred.redis.expires" setting. This is really only meant as a quick way for different servers to communicate with each other. If your use case is different, don't use this. :param host: host name of the redis server :param port: port the redis server uses :param kwargs: extra kwargs sent to :class:`redis.Redis`""" # Redis self.host = host or settings.cred.redis.host; self.port = port or settings.cred.redis.port # Redis self.client = redis.Redis(host=self.host, port=self.port, **kwargs) # Redis
def __getitem__(self, i): res = self.client.get(i); return None if res is None else dill.loads(res) # Redis def __setitem__(self, i, v, ex=None): ex = ex or settings.cred.redis.expires; self.client.set(i, dill.dumps(v), **({"ex":ex} if ex < float("inf") else {})) # Redis def __call__(self, v, ex=None) -> str: i = _redisAutoInc(); self.__setitem__(i, v, ex); return i # Redis def __repr__(self): return f"<Redis host='{self.host}:{self.port}'>" # Redis