# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import k1lib as k1, os, json, k1lib, time, dill
import k1lib.cli as cli
from collections import deque
from functools import lru_cache
from contextlib import contextmanager
__all__ = ["sql", "sqldb", "sqltable", "sqlrow", "minio", "s3", "s3bucket", "s3obj", "Redis"]
settings = k1lib.settings
@contextmanager
def mysqlCnf(user, password, host, port): # mysqlCnf
fn = f"""[client]\nuser = "{user}"\npassword = "{password or ''}"\nhost = "{host}"\nport = "{port}" """ | cli.file() # mysqlCnf
try: yield fn # mysqlCnf
finally: os.remove(fn) # mysqlCnf
settings.cred.add("sql", k1lib.Settings() # mysqlCnf
.add("mode", "my", env="K1_SQL_MODE") # mysqlCnf
.add("host", "127.0.0.1", "host name of db server, or db file name if mode='lite'. Warning: mysql's mysqldump won't resolve domain names, so it's best to pass in ip addresses", env="K1_SQL_HOST") # mysqlCnf
.add("port", 3306, env="K1_SQL_PORT") # mysqlCnf
.add("user", "admin", sensitive=True, env="K1_SQL_USER") # mysqlCnf
.add("password", "admin", sensitive=True, env="K1_SQL_PASSWORD"), "anything related to sql, used by k1lib.cli.lsext.sql. See docs for that class for more details") # mysqlCnf
mysqlConn = k1.dep("mysql.connector", "mysql-connector-python", "https://pypi.org/project/mysql-connector-python/") # mysqlCnf
pgConn = k1.dep("psycopg2", "psycopg2-binary", "https://pypi.org/project/psycopg2/") # mysqlCnf
sqlite3 = k1.dep("sqlite3") # mysqlCnf
qD = {"my": "`", "pg": "", "lite": ""} # quote dict # mysqlCnf
default = object() # mysqlCnf
[docs]class sql: # sql
[docs] def __init__(self, host=default, port=default, user=default, password=default, mode=default): # sql
"""Creates a connection to a SQL database.
Example::
s = sql("127.0.0.1") # creates a new sql object. Apparently, mysql and mysqldump on the command line bugs out if you put "localhost". If you put localhost here, it should work fine, but `sql(...) | toBytes()` and other functions that use external programs can deny you access
s.refresh() # refreshes connection any time you encounter strange bugs
s | ls() # returns List[sqldb], lists out all databases
s | toBytes() # returns Iterator[str], dumps every databases. Yes, it's "toBytes", but it has the feeling of serializing whatever the input is, so it's at least intuitive in that way
"dump.sql" | s # restores the database using the dump file
cat("dump.sql") | s # restores the database using the dump file
db1 = s["db1"] # returns sqldb, gets database named "db1"
db1 | ls() # returns List[sqltable], list out all tables within this database
db1 | toBytes() # returns Iterator[str], dumps the database
users = db1["user"] # gets table named "user", short and simple
db1.query("select * from users") # queries the database using your custom query
db1.query("select * from users where user_id=%s", 3) # queries with prepared statement
users.info() # prints out the first 10 rows of the table and the table schema
users.cols # returns table's columns as List[str]
len(users) # returns number of rows
users.query(...) # can also do a custom query, just like with databases
users[1] # grabs user with id 1. Returns None if user doesn't exist
users[1:10] # grabs users with id from 1 (inclusive) to 10 (exclusive)
users[1].firstname = "Reimu" # sets the first name of user with id 1 to "Reimu"
users[1] = {"firstname": "Reimu", "lastname": "Hakurei"} # sets first name and last name of user with id 1
users.insert(firstname="Yuyuko", lastname="Saigyouji") # inserts a new row
users.insertBulk(firstname=["Yuyuko", "Marisa"], lastname=["Saigyouji", "Kirisame"]) # inserts multiple rows at once
users | toBytes() # dumps this specific table, returns Iterator[str]
users | cat() | display() # reads entire table, gets first 10 rows and displays it out
users | (cat() | head(20)) | display() # reads first 20 rows only, then displays the first 10 rows. Query sent is "select * from user limit 20"
users | (cat() | filt("x == 4", 3) | head(20)) | display() # grabs first 20 rows that has the 4th column equal to 4, then displays the first 10 rows. Query sent is "select user_id, address, balance, age from user where age = 4", assuming the table only has those columns
Philosophy for these methods is that they should be intuitive to look at and interact with, not for performance.
If performance is needed, just write raw queries or don't use a database and use applyCl() instead.
If any of the params here are not specified, they will take on the values from
"settings.cred.sql", which can be initialized from environment variables.
Execute and display/print "settings" in a new cell to see all of them.
:param host: host name, ip address, or file name (in case of sqlite)
:param port: port at the host
:param uesr: database user name. If not specified then fallback to environment variable ``SQL_USER``, then ``USER``
:param password: database password. If not specified then assume database doesn't require one
:param mode: currently supports 3 values: "my" (MySQL), "pg" (PostgreSQL) and "lite" (SQLite)""" # sql
host = settings.cred.sql.host if host is default else host; port = settings.cred.sql.port if port is default else port; user = settings.cred.sql.user if user is default else user # sql
password = settings.cred.sql.password if password is default else password; mode = settings.cred.sql.mode if mode is default else mode # sql
if mode not in ("my", "pg", "lite"): raise Exception(f"Supports only 'my' (MySQL), 'pg' (PostgreSQL) and 'lite' (SQLite) for now, don't support {mode}") # sql
if mode == "lite": host = os.path.expanduser(host) # sql
self.host = host; self.port = port; self.user = user or os.environ.get("SQL_USER") or os.environ.get("USER") # sql
self.password = password or os.environ.get("SQL_PASSWORD"); self.db = None; self.mode = mode; self.conn = None; self.refresh() # sql
self.star = "%s" if mode == "my" or mode == "pg" else "?" # sql
[docs] def refresh(self): # sql
"""Sometimes, the connection errors out for whatever reason, so this method is to
reestablish the connection. You don't need to worry about this though, as on every query,
if it detects that the connection has dropped, it will automatically try to refresh""" # sql
try: self.conn.close(); self.conn = None # sql
except: pass # sql
kwargs = dict(host=self.host, port=self.port, user=self.user, password=self.password, database=self.db) # sql
if self.mode == "my": self.conn = mysqlConn.connect(**kwargs, charset='utf8mb4', collation='utf8mb4_general_ci') # sql
elif self.mode == "pg": self.conn = pgConn.connect(**kwargs) # sql
elif self.mode == "lite": self.conn = sqlite3.connect(self.host) # sql
def _changeDb(self, db): # sql
if self.db != db: # sql
if self.mode == "my": self.query(f"use `{db}`") # sql
elif self.mode == "pg": self.db = db; self.refresh() # sql
elif self.mode == "lite": pass # sql
self.db = db # sql
[docs] def cursor(self): # sql
"""Creates a new cursor, auto-refreshes if necessary""" # sql
if self.mode == "my": # sql
try: cur = self.conn.cursor() # sql
except mysqlConn.errors.OperationalError: self.refresh(); cur = self.conn.cursor() # sql
elif self.mode == "pg" or self.mode == "lite": # sql
cur = self.conn.cursor() # sql
self._lastCur = cur; return cur # sql
def _query(self, query, *args): # sql
cur = self.cursor(); cur.execute(query, args) # sql
try: ans = cur.fetchall() # sql
except: ans = None # sql
return ans, cur # sql
[docs] def query(self, query, *args): # sql
"""Executes a query. Returns the resulting table.
Example::
s.query("insert into users (name, age) values (%s, %s)", "Reimu", 25)
""" # sql
ans, cur = self._query(query, *args); cur.close(); self.conn.commit(); return ans # sql
[docs] def queryDesc(self, query, *args): # sql
"""Executes a query. Returns the resulting table and the description of the table (with column names and whatnot)""" # sql
ans, cur = self._query(query, *args); desc = cur.description; cur.close(); self.conn.commit(); return ans, desc # sql
[docs] def queryMany(self, query, *args): # sql
"""Executes multiple queries at the same time.
Example::
s.queryMany("insert into users (name, age) values (%s, %s)", [("Reimu", 25), ("Marisa", 26)])
""" # sql
cur = self.cursor(); cur.executemany(query, *args) # sql
try: ans = cur.fetchall() # sql
except: ans = None # sql
cur.close(); self.conn.commit(); return ans # sql
def _ls(self): # sql
if self.mode == "my": return [sqldb(self, e[0]) for e in self.query("show databases")] # sql
elif self.mode == "pg": return [sqldb(self, e[0]) for e in self.query("select datname from pg_database where datistemplate=false")] # sql
elif self.mode == "lite": return [sqldb(self, "default")] # sql
def __repr__(self): return f"<sql mode={self.mode} host={self._host}>" # sql
@property # sql
def _host(self) -> str: # sql
if self.mode == "lite": return self.host.split(os.sep)[-1] # sql
else: return f"{self.host}:{self.port}" # sql
def _cnfCtx(self): return mysqlCnf(self.user, self.password, self.host, self.port) # sql
def _toBytes(self): # sql
if self.mode == "my": # sql
with self._cnfCtx() as fn: yield from None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob --all-databases") # sql
else: raise Exception(f"All databases dump of mode {self.mode} is not supported yet") # sql
[docs] def __ror__(self, it): # restoring a backup # sql
if self.mode == "my": # sql
def restore(fn): # sql
with self._cnfCtx() as cnfFn: None | cli.cmd(f"mysql --defaults-file={cnfFn} < {fn}") | cli.ignore() # sql
if isinstance(it, str): restore(it) # sql
else: fn = it | cli.file(); restore(fn); os.remove(fn) # sql
else: raise Exception(f"Restoring database from .sql file of mode {self.mode} is not supported yet") # sql
def __len__(self): return len(self._ls()) # sql
def __getitem__(self, idx): # sql
if isinstance(idx, str): # sql
lsres = [x for x in self._ls() if x.name == idx] # sql
if len(lsres) == 1: return lsres[0] # sql
else: raise Exception(f"Database '{idx}' does not exist, can't retrieve it") # sql
return self._ls()[idx] # sql
def __iter__(self): return iter(self._ls()) # sql
def __delitem__(self, idx): # sql
if isinstance(idx, str): # sql
lsres = [x for x in self._ls() if x.name == idx] # sql
if len(lsres) == 1: # sql
if self.mode == "my" or self.mode == "pg": return self.query(f"drop database {idx}") # sql
else: raise Exception("Currently only support dropping databases in mysql and postgresql") # sql
else: raise Exception(f"Database '{idx}' does not exist, can't delete it") # sql
else: raise Exception("Only supports deleting with table names. Use this like `del db['some_table']`") # sql
[docs]class sqldb: # sqldb
[docs] def __init__(self, sql:sql, name:str): # sqldb
"""A sql database representation. Not expected to be instatiated by you. See also: :class:`sql`""" # sqldb
self.sql = sql; self.name = name # sqldb
[docs] def query(self, query, *args): self.sql._changeDb(self.name); return self.sql.query(query, *args) # sqldb
[docs] def queryDesc(self, query, *args): self.sql._changeDb(self.name); return self.sql.queryDesc(query, *args) # sqldb
[docs] def queryMany(self, query, *args): self.sql._changeDb(self.name); return self.sql.queryMany(query, *args) # sqldb
def _ls(self): # sqldb
if self.sql.mode == "my": return [sqltable(self.sql, self, e[0]) for e in self.query(f"show tables")] # sqldb
if self.sql.mode == "pg": return [sqltable(self.sql, self, e[0]) for e in self.query(f"select table_name from information_schema.tables")] # sqldb
if self.sql.mode == "lite": return [sqltable(self.sql, self, e[0]) for e in self.query("select name from sqlite_master where type='table'")] # sqldb
def __repr__(self): return f"<sqldb host={self.sql._host} db={self.name}>" # sqldb
def _toBytes(self): # sqldb
if self.sql.mode == "my": # sqldb
with self.sql._cnfCtx() as fn: yield from None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob --databases {self.name}") # sqldb
else: raise Exception(f"Database dump of mode {self.sql.mode} is not supported yet") # sqldb
[docs] def __ror__(self, it): q = qD[self.sql.mode]; return self.sql.__ror__([f"USE {q}{self.name}{q};", *it]) # sqldb
def __len__(self): return len(self._ls()) # sqldb
def __getitem__(self, idx): # sqldb
if isinstance(idx, str): # sqldb
lsres = [x for x in self._ls() if x.name == idx] # sqldb
if len(lsres) == 1: return lsres[0] # sqldb
else: raise Exception(f"Table '{idx}' does not exist, can't retrieve it") # sqldb
return self._ls()[idx] # sqldb
def __iter__(self): return iter(self._ls()) # sqldb
def __delitem__(self, idx): # sqldb
if isinstance(idx, str): # sqldb
lsres = [x for x in self._ls() if x.name == idx] # sqldb
if len(lsres) == 1: return self.query(f"drop table {idx}") # sqldb
else: raise Exception(f"Table '{idx}' does not exist, can't delete it") # sqldb
else: raise Exception("Only supports deleting with table names. Use this like `del db['some_table']`") # sqldb
[docs]class sqltable: # sqltable
[docs] def __init__(self, sql, sqldb, name:str): # sqltable
"""A sql table representation. Not expected to be instantiated by you. See also: :class:`sql`""" # sqltable
self.sql = sql; self.sqldb = sqldb; self.name = name; self._cols = None # sqltable
def _cat(self, ser): # sqltable
cols = self.cols; _2 = [] # clis that can't be optimized, stashed away to be merged with ser later on # sqltable
q = qD[self.sql.mode]; clis = deque(ser.clis) # sqltable
o1 = None # cut() opt # sqltable
o2 = None # head() opt # sqltable
o3 = [] # filt() opt # sqltable
while len(clis) > 0: # sqltable
c = clis.popleft() # sqltable
if isinstance(c, cli.filt): _2.append(c); break # TODO: add optimizations for filt # sqltable
elif o2 is None and isinstance(c, cli.head): # sqltable
if round(c.n) != c.n or c.n < 0 or c.inverted or c.n == None: _2.append(c); break # sqltable
else: o2 = f"limit {c.n}"; continue # sqltable
elif o1 is None and isinstance(c, cli.cut): # sqltable
if isinstance(c.columns, slice): _2.append(c); o1 = 0; continue # sqltable
else: # sqltable
o1 = ", ".join([f"{q}{c}{q}" for c in cols | cli.rows(*c.columns)]) # sqltable
if len(c.columns) == 1: _2.append(cli.item().all() | cli.aS(list)) # sqltable
else: _2.append(c); break # sqltable
o1 = o1 or ", ".join([f"{q}{c}{q}" for c in cols]) # sqltable
query = f"select {o1} from {q}{self.name}{q} {o2 or ''}"#; print(f"query: {query}"); return [] # sqltable
sql = self.sql; return [sqlrow(sql, self, row) for row in self.sqldb.query(query) | cli.serial(*_2, *clis)] # sqltable
@property # sqltable
def cols(self): # sqltable
"""Get column names""" # sqltable
if not self._cols: self._cols = self._describe()[1:] | cli.cut({"my": 0, "pg": 0, "lite": 1}[self.sql.mode]) | cli.deref() # sqltable
return self._cols # sqltable
@lru_cache # sqltable
def _describe(self): # sqltable
if self.sql.mode == "my": return self.sqldb.query(f"describe `{self.name}`") | cli.insert(["Field", "Type", "Null", "Key", "Default", "Extra"]) | cli.deref() # sqltable
if self.sql.mode == "pg": return self.sqldb.query(f"select column_name, data_type, is_nullable, column_default, ordinal_position from information_schema.columns where table_name='{self.name}'") | cli.insert(["column_name", "data_type", "is_nullable", "column_default", "ordinal_position"]) | cli.deref() # sqltable
if self.sql.mode == "lite": return self.sqldb.query(f"pragma table_info([{self.name}])") | cli.insert(["cid", "name", "type", "notnull", "dflt_value", "pk"]) | cli.deref() # sqltable
[docs] def insert(self, **kwargs): # sqltable
"""Inserts a row.
Example::
table = ...
table.insert(firstname="Yuyuko", lastname="Saigyouji")
""" # sqltable
keys = ", ".join([f"`{x}`" for x in kwargs.keys()]); values = ", ".join([self.sql.star]*len(kwargs)) # sqltable
self.query(f"insert into {self.name} ({keys}) values ({values})", *kwargs.values()) # sqltable
if self.sql.mode == "my": return self[self.query("select last_insert_id()")[0][0]] # sqltable
if self.sql.mode == "lite": return self[self.sql._lastCur.lastrowid] # sqltable
[docs] def insertBulk(self, **kwargs): # sqltable
"""Inserts a row.
Example::
table = ...
table.insertBulk(firstname=["Yuyuko", "Marisa"], lastname=["Saigyouji", "Kirisame"])
""" # sqltable
keys = ", ".join([f"`{x}`" for x in kwargs.keys()]); values = ", ".join([self.sql.star]*len(kwargs)) # sqltable
self.queryMany(f"insert into {self.name} ({keys}) values ({values})", list(kwargs.values() | cli.T())) # sqltable
if self.sql.mode == "my": return self[self.query("select last_insert_id()")[0][0]] # sqltable
if self.sql.mode == "lite": return self[self.sql._lastCur.lastrowid] # sqltable
def _update(self, idx, **kwargs): # sqltable
"""Updates a row.
Example::
rable = ...
table.update(3, firstname="Youmu", lastname="Konpaku")
""" # sqltable
p1 = ", ".join([f"`{k}` = {self.sql.star}" for k in kwargs.keys()]) # sqltable
self.query(f"update {self.name} set {p1} where {self.cols[0]} = {idx}", *kwargs.values()) # sqltable
[docs] def info(self, out=False): # sqltable
"""Preview table.
Example::
table = ...
table.info()
:param out: if True, returns a list of lines instead of printing them out""" # sqltable
def gen(): # sqltable
desc = self._describe() | cli.deref(); cols = self.cols; mode = self.sql.mode; q = qD[mode]; s = ", ".join([f"{q}{e}{q}" for e in cols]) # sqltable
if mode == "my": # sqltable
status = self.sqldb.query(f"show table status like '{self.name}'")[0] # sqltable
status = f"engine:{status[1]}, #rows(approx):{status[4]}, len(row):{status[5]}, updated:{status[12]}UTC, created:{status[11]}UTC" # sqltable
elif mode == "lite": # sqltable
nrows = self.query(f"select max(rowid) from {q}{self.name}{q}")[0][0] # sqltable
ncols = len(self.query(f"select * from {q}{self.name}{q} limit 1")[0]) # sqltable
status = f"#rows(approx):{nrows}, len(row):{ncols}" # sqltable
else: nrows = self.sqldb.query(f"select count(*) from {q}{self.name}{q}")[0][0]; status = f"{nrows} rows total" # sqltable
print(f"Table `{self.name}` ({status})\n"); self.sqldb.query(f"select {s} from {q}{self.name}{q} limit 9") | (cli.aS(repr) | cli.head(50)).all(2) | cli.insert(cols) | cli.display() # sqltable
# print tails # sqltable
if mode == "lite": print("...\n..."); self.sqldb.query(f"select {s} from {q}{self.name}{q} order by rowid desc limit 9") | cli.reverse() | (cli.aS(repr) | cli.head(50)).all(2) | cli.insert(cols) | cli.display() # sqltable
if mode == "my" or mode == "pg": # sqltable
try: # sqltable
data = self.sqldb.query(f"select {s} from {q}{self.name}{q} order by {cols[0]} desc limit 9") | cli.reverse() | (cli.aS(repr) | cli.head(50)).all(2) | cli.insert(cols) | cli.deref() # sqltable
print("...\n..."); data | cli.display() # sqltable
except: pass # sqltable
print(""); desc | cli.display(None) # sqltable
if out: # sqltable
with k1.captureStdout() as out: gen() # sqltable
return out() # sqltable
else: gen() # sqltable
[docs] def query(self, query, *args): return self.sqldb.query(query, *args) # sqltable
[docs] def queryDesc(self, query, *args): return self.sqldb.queryDesc(query, *args) # sqltable
[docs] def queryMany(self, query, *args): return self.sqldb.queryMany(query, *args) # sqltable
[docs] def select(self, query, *args): # sqltable
"""Pretty much identical to :meth:`query`, but this postprocess the results
a little bit, and package them into :class:`sqlrow` objects""" # sqltable
res = self.query(query, *args); sql = self.sql; return [sqlrow(sql, self, e) for e in res] # sqltable
def __repr__(self): return f"<sqltable host={self.sql._host} db={self.sqldb.name} table={self.name}>" # sqltable
def _toBytes(self): # sqltable
if self.sql.mode == "my": # sqltable
with self.sql._cnfCtx() as fn: yield from None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob {self.sqldb.name} {self.name}") # sqltable
else: raise Exception(f"Table dump of mode {self.sql.mode} is not supported yet") # sqltable
[docs] def __ror__(self, it): return self.sqldb.__ror__(it) # sqltable
def __len__(self): return self.query(f"select count(*) from {self.name}")[0][0] # sqltable
[docs] def lookup(self, **kwargs): # sqltable
"""Convenience function to lookup 1 instance with the specified value.
Example::
user = users.lookup(firstname="Reimu")
# multiple columns work too
user = users.lookup(firstname="Reimu", lastname="Hakurei")
""" # sqltable
p1 = " AND ".join([f"{k} = {self.sql.star}" for k in kwargs.keys()]) # sqltable
res = self.query(f"select * from {self.name} where {p1} limit 1", *kwargs.values()) # sqltable
return None if len(res) == 0 else sqlrow(self.sql, self, res[0]) # sqltable
def __getitem__(self, idx): # sqltable
idName = self.cols[0] # sqltable
if idx is None: return None # sqltable
elif isinstance(idx, (int, str)): # sqltable
res = self.query(f"select * from {self.name} where {idName} = {self.sql.star}", idx) # sqltable
if len(res) == 0: return None # sqltable
return sqlrow(self.sql, self, res[0]) # sqltable
elif isinstance(idx, slice): # sqltable
idxs = list(range(idx.stop if idx.stop is not None else (self.query(f"select max({idName}) from {self.name}")[0][0]+1))[idx]) # sqltable
res = self.query(f"select * from {self.name} where {idName} in ({', '.join([str(x) for x in idxs])})") # sqltable
d = {row[0]:row for row in res}; sql = self.sql; return [sqlrow(sql, self, d[idx]) if d.get(idx, None) else None for idx in idxs] # sqltable
else: # sqltable
idxs = list(idx) # sqltable
if all([isinstance(e, int) for e in idxs]): # does not support string because that might lead to sql injection. Can fix, but too lazy # sqltable
res = self.query(f"select * from {self.name} where {idName} in ({', '.join([str(e) for e in idx])})") # sqltable
d = {row[0]:row for row in res}; sql = self.sql; return [sqlrow(sql, self, d[idx]) if d.get(idx, None) else None for idx in idxs] # sqltable
raise Exception("Only support table indexing of integers or strings or slices or list[int]") # sqltable
def __setitem__(self, idx, value): # sqltable
if not isinstance(value, dict): raise Exception("Only accepts setting elements with dicts") # sqltable
if self[idx] is None: raise Exception(f"Can't set element with id {idx}, it doesn't seem to exist. Use table.insert(col1=value1, col2=value2) instead") # sqltable
self._update(idx, **value) # sqltable
def __iter__(self): return iter(self | cli.cat()) # sqltable
def __delitem__(self, idx): # sqltable
idName = self.cols[0] # sqltable
if isinstance(idx, (int, str)): # sqltable
res = self[idx] # sqltable
if res: self.query(f"delete from {self.name} where {idName} = {self.sql.star}", res[0]) # sqltable
elif isinstance(idx, slice): # sqltable
res = self[idx]; ids = ", ".join([row[0] for row in res if row]) # sqltable
self.query(f"delete from {self.name} where {idName} in {ids}") # sqltable
else: raise Exception("Only support table indexing of integers or strings or slices") # sqltable
[docs]class sqlrow: # sqlrow
def __init__(self, sql, sqltable, row): # sqlrow
self._ab_sentinel = True; self._sql = sql; self._sqltable = sqltable; self._row = row # sqlrow
self.__dict__.update({k:v for k,v in zip(sqltable.cols, row)}); self._ab_sentinel = False # sqlrow
def __getitem__(self, idx): return self._row[idx] # sqlrow
def __setattr__(self, attr, value): # sqlrow
if attr == "_ab_sentinel": self.__dict__[attr] = value # sqlrow
else: # sqlrow
if self._ab_sentinel or attr not in self._sqltable.cols: self.__dict__[attr] = value # sqlrow
else: # sqlrow
self._sqltable._update(self._row[0], **{attr: value}); self.__dict__[attr] = value # sqlrow
idx = {x:y for x,y in zip(self._sqltable.cols, range(len(self._row)))}[attr] # sqlrow
row = list(self._row); row[idx] = value; self._row = row # sqlrow
def __len__(self): return len(self._row) # sqlrow
def __repr__(self): # sqlrow
ans = [] # sqlrow
for elem in self._row: # sqlrow
if isinstance(elem, str): ans.append(f"({len(elem)} len) {json.dumps(elem[:100])}..." if len(elem) > 100 else f"({len(elem)} len) {json.dumps(elem)}") # sqlrow
elif isinstance(elem, bytes): ans.append(f"({len(elem)} len) {elem[:100]}..." if len(elem) > 100 else f"({len(elem)} len) {elem}") # sqlrow
else: ans.append(f"{elem}") # sqlrow
return "(" + f", ".join(ans) + ")" # sqlrow
boto3 = k1.dep("boto3") # if below so that the tests would run since I have loaded the creds from my startup.py # sqlrow
if "minio" not in settings.cred.__dict__: settings.cred.add("minio", k1lib.Settings().add("host", "https://localhost:9000", env="K1_MINIO_HOST").add("access_key", "", sensitive=True, env="K1_MINIO_ACCESS_KEY").add("secret_key", "", sensitive=True, env="K1_MINIO_SECRET_KEY"), "anything related to minio buckets, used by k1lib.cli.lsext.minio") # sqlrow
[docs]def minio(host=None, access_key=None, secret_key=None) -> "s3": # minio
"""Convenience function that constructs a :class:`s3` object but focused on minio.
If the params are not specified, then it takes on the values in the settings
"settings.cred.minio". Example::
s = minio() # returns s3 instance
s | ls() # list all buckets, all other normal operations
:param host: looks like "http://localhost:9000"
""" # minio
return s3(boto3.client('s3', # minio
endpoint_url=host or settings.cred.minio.host, # minio
aws_access_key_id=access_key or settings.cred.minio.access_key, # minio
aws_secret_access_key=secret_key or settings.cred.minio.secret_key, # minio
region_name='us-west-2')) # minio
[docs]class s3: # s3
[docs] def __init__(self, client): # s3
"""Represents an S3 client.
Example::
client = boto3.client("s3", ...) # put your credentials and details here
db = s3(client) # creates an S3 manager
db | ls() # lists all buckets accessible
bucket = db | ls() | item() # grabs the first bucket, returns object of type s3bucket
bucket = db["bucket-name"] # or you can instantiate the bucket directly
bucket | ls() # lists all objects within this bucket
bucket | ls() | grep("\\.so") # grabs all .so files from the bucket
obj = bucket | ls() | item() # grabs the first object within this bucket, returns object of type s3obj
obj = bucket["some_key"] # or, grab a specific object
obj.key, obj.size, obj.lastModified # some fields directly accessible
obj = "abc\ndef" | bucket.upload("somekey") # uploads a file by piping the contents in, returns s3obj
obj = b"abc\ndef"| bucket.upload() # same as above, but with an auto-generated key
This mostly offers interoperability with ls() and cat(), so that you can
write relatively intuitive code, but fundamentally provides no upsides
:param client: boto3 client""" # s3
self.client = client; self._cachedLs = None # s3
def _ls(self): self._cachedLs = {x["Name"]:s3bucket(self.client, x["Name"]) for x in self.client.list_buckets()["Buckets"]}; return list(self._cachedLs.values()) # s3
def __len__(self): # s3
if self._cachedLs is None: self._ls() # s3
return len(self._cachedLs) # s3
def __getitem__(self, name): # s3
if self._cachedLs is None: self._ls() # s3
if name not in self._cachedLs: self._ls() # s3
if name not in self._cachedLs: raise Exception(f"Bucket '{name}' doesn't exist") # s3
return self._cachedLs[name] # s3
def __repr__(self): return f"<kaws.s3 client>" # s3
_s3bucketUploader_autoInc = k1lib.AutoIncrement(prefix=f"_key_{round(time.time()*1000)}_") # s3
class s3bucketUploader(cli.BaseCli): # s3bucketUploader
def __init__(self, client, bucket:"s3bucket", key:str): # s3bucketUploader
"""Uploads the data piped in, uploads, then return a :class:`s3obj`""" # s3bucketUploader
self.client = client; self.bucket = bucket; self.key = key # s3bucketUploader
def __ror__(self, it): # s3bucketUploader
if isinstance(it, (bytes, str)): self.client.put_object(Bucket=self.bucket.name, Key=self.key, Body=(it if isinstance(it, bytes) else it.encode())) # first path is more direct and doesn't have to save to a file first # s3bucketUploader
else: tmpFile = it | cli.file(); self.client.upload_file(tmpFile, self.bucket.name, self.key); os.remove(tmpFile) # s3bucketUploader
return self.bucket[self.key] # s3bucketUploader
def __repr__(self): return f"<s3bucketUploader bucket.name='{self.bucket.name}' key='{self.key}'>" # s3bucketUploader
[docs]class s3bucket: # s3bucket
[docs] def __init__(self, client, name:str): # s3bucket
"""Represents an S3 bucket.
Example::
client = ...
db = s3(client)
bucket = db["bucket-name-here"]
See also: :class:`s3`""" # s3bucket
self.client = client; self.name = name; self._cachedLs = None # s3bucket
def _ls(self): # s3bucket
client = self.client; name = self.name; # s3bucket
self._cachedLs = {data["Key"]:s3obj(client, name, data) for data in self.client.list_objects(Bucket=name).get("Contents", [])} # s3bucket
return list(self._cachedLs.values()) # s3bucket
def __len__(self): # s3bucket
if self._cachedLs is None: self._ls() # s3bucket
return len(self._cachedLs) # s3bucket
[docs] def upload(self, key:str=None): # s3bucket
"""Uploads some content to s3.
Example::
bucket = ...
obj = "abc\ndef" | bucket.upload() # uploads some text
obj = b"abc\ndef" | bucket.upload() # uploads some bytes
This works with whatever you can pipe into :class:`~k1lib.cli.output.file`. After uploading,
you will receive a :class:`s3obj` object.
:param key: if not specified, will auto generate a key""" # s3bucket
if key is None: key = _s3bucketUploader_autoInc() # s3bucket
return s3bucketUploader(self.client, self, key) # s3bucket
def __repr__(self): return f"<s3bucket name='{self.name}'>" # s3bucket
def __getitem__(self, key:str): # s3bucket
res = self.client.head_object(Bucket=self.name, Key=key) # s3bucket
data = {"Key": key, "LastModified": res["LastModified"], "Size": int(res["ResponseMetadata"]["HTTPHeaders"]["content-length"]), "StorageClass": None} # s3bucket
return s3obj(self.client, self.name, data) # s3bucket
[docs]class s3obj: # s3obj
[docs] def __init__(self, client, bucket:str, data): # s3obj
"""Represents an S3 object. Not intended to be instantiated directly.
See also: :class:`s3`""" # s3obj
self.client = client; self.bucket = bucket # s3obj
self.key = data["Key"]; self.lastModified = data["LastModified"] | cli.toUnix() # s3obj
self.size = data["Size"]; self.storageClass = data["StorageClass"] # s3obj
def __repr__(self): return f"<s3obj bucket='{self.bucket}' key='{self.key}' size='{k1lib.fmt.size(self.size)}' lastModified='{self.lastModified | cli.toIso()}'>" # s3obj
def _cat(self, kwargs): # s3obj
sB = kwargs["sB"]; eB = kwargs["eB"] # s3obj
if eB < 0: eB = self.size # s3obj
res = self.client.get_object(Bucket=self.bucket, Key=self.key, Range=f'bytes={sB}-{eB-1}')["Body"].read() # s3obj
if kwargs["text"]: return res.decode().split("\n") # s3obj
if kwargs["chunks"]: return res | cli.batched(settings.cli.cat.chunkSize, True) # s3obj
return res # s3obj
redis = k1lib.dep("redis") # s3obj
_redisAutoInc = k1lib.AutoIncrement(prefix=f"_redis_msg_{round(time.time())}_") # s3obj
settings.cred.add("redis", k1lib.Settings() # s3obj
.add("host", "localhost", "location of the redis server", env="K1_REDIS_HOST") # s3obj
.add("port", 6379, "port the redis server use", env=("K1_REDIS_PORT", int)) # s3obj
.add("expires", 60, "seconds before the message deletes itself. Can be float('inf'), or 'inf' for the env variable", env=("K1_REDIS_EXPIRES", lambda x: float(x.lower()))), # s3obj
"anything related to redis, used by k1lib.cli.lsext.Redis") # s3obj
[docs]class Redis: # Redis
[docs] def __init__(self, host=None, port=None, **kwargs): # Redis
"""Connects to Redis server.
Example::
r = Redis("localhost", 6379)
r["abc"] = {"some": "json", "number": 4} # sets value
r["abc"] # reads value, returns python object
key = r("some message") # sets value with auto generated key, returns that key
r[key] # reads that value, returns "some message"
r(key, 60) # sets value with auto generated key, with expires amount, returns that key
You can actually leave the constructors empty, as it will automatically pull the data from
"settings.cred.redis.host", which pulls from the env variable "K1_REDIS_HOST". So it can
be as short as this::
r = Redis()
r["abc"] = {"some": "json", "number": 4} # sets value
Internally, all objects are pickled and unpickled, so you can transport pretty much
any Python object. Also, by default, each message will expire after 60s, You can
adjust this via "settings.cred.redis.expires" setting.
This is really only meant as a quick way for different servers to communicate
with each other. If your use case is different, don't use this.
:param host: host name of the redis server
:param port: port the redis server uses
:param kwargs: extra kwargs sent to :class:`redis.Redis`""" # Redis
self.host = host or settings.cred.redis.host; self.port = port or settings.cred.redis.port # Redis
self.client = redis.Redis(host=self.host, port=self.port, **kwargs) # Redis
def __getitem__(self, i): res = self.client.get(i); return None if res is None else dill.loads(res) # Redis
def __setitem__(self, i, v, ex=None): ex = ex or settings.cred.redis.expires; self.client.set(i, dill.dumps(v), **({"ex":ex} if ex < float("inf") else {})) # Redis
def __call__(self, v, ex=None) -> str: i = _redisAutoInc(); self.__setitem__(i, v, ex); return i # Redis
def __repr__(self): return f"<Redis host='{self.host}:{self.port}'>" # Redis