# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import k1lib as k1, os, json, k1lib, time, dill, threading
import k1lib.cli as cli
from collections import deque
from functools import lru_cache
from contextlib import contextmanager
__all__ = ["sql", "sqldb", "sqltable", "sqlrow", "minio", "s3", "s3bucket", "s3obj", "Redis"]
psycopg2 = k1.dep("psycopg2", "psycopg2-binary", "https://pypi.org/project/psycopg2/")
class PgConn: # PgConn
def __init__(self, p:"PgConns", db:str): # PgConn
self.p = p; self.db = db; self.conn = None # PgConn
def _connect(self): p = self.p; self.conn = psycopg2.connect(host=p.host, port=p.port, database=self.db, user=p.user, password=p.password); self.conn.autocommit = True # PgConn
def queryRaw(self, f): # PgConn
if self.conn is None: self._connect() # PgConn
cur = self.conn.cursor(); res = f(cur); cur.close(); return res # absolute barebones, just prepare the cursor and handles all errors before # PgConn
def query(self, query, *args, mode=0): # modes: 0 (result), 1 ([res, desc]), 2 (assumes result is a table, then inserts desc to the top row) # PgConn
def inner(cur): # PgConn
res = None; desc = None; cur.execute(query, args) # PgConn
try: res = cur.fetchall() # PgConn
except psycopg2.ProgrammingError: pass # PgConn
if mode > 0: desc = [x.name for x in cur.description] # PgConn
return res if mode == 0 else ((res, desc) if mode == 1 else [desc, *res]) # PgConn
return self.queryRaw(inner) # PgConn
def __repr__(self): return f"<PgConn host={self.p.host}:{self.p.port} db={self.db}>" # PgConn
class PgConns: # PgConns
def __init__(self, host:str, port:int, user:str, password:str): # PgConns
self.host = host; self.port = port; self.user = user; self.password = password; self.conns = {} # PgConns
def __getitem__(self, key:str): # PgConns
if key not in self.conns: self.conns[key] = PgConn(self, key) # PgConns
return self.conns[key] # PgConns
def __repr__(self): return f"<PgConns host={self.host}:{self.port}>" # PgConns
class PgManager: # singleton, to efficiently manages all connections from everywhere # PgManager
def __init__(self): self.connss = {} # PgManager
def __call__(self, host, port, user, password): # PgManager
key = (host, port, user, password) # PgManager
if key not in self.connss: self.connss[key] = PgConns(host, port, user, password) # PgManager
return self.connss[key] # PgManager
mysqlConn = k1.dep("mysql.connector", "mysql-connector-python", "https://pypi.org/project/mysql-connector-python/") # PgManager
class MyConn: # MyConn
def __init__(self, p:"MyConns", db:str): self.p = p; self.db = db; self.conn = None; self._connect() # MyConn
def _connect(self): p = self.p; self.conn = mysqlConn.connect(host=p.host, port=p.port, database=self.db, user=p.user, password=p.password, charset='utf8mb4', collation='utf8mb4_general_ci'); self.conn.autocommit = True # MyConn
def queryRaw(self, f): cur = self.conn.cursor(); res = f(cur); cur.close(); return res # MyConn
def query(self, query, *args, mode=0): # MyConn
def inner(cur): # MyConn
res = None; desc = None; cur.execute(query, args) # MyConn
res = cur.fetchall() # MyConn
if mode > 0: desc = [x[0] for x in cur.description] # MyConn
return res if mode == 0 else ((res, desc) if mode == 1 else [desc, *res]) # MyConn
return self.queryRaw(inner) # MyConn
def __repr__(self): return f"<MyConn host={self.p.host}:{self.p.port} db={self.db}>" # MyConn
class MyConns: # MyConns
def __init__(self, host:str, port:int, user:str, password:str): # MyConns
self.host = host; self.port = port; self.user = user; self.password = password; self.conns = {} # MyConns
def __getitem__(self, key:str): # MyConns
if key not in self.conns: self.conns[key] = MyConn(self, key) # MyConns
return self.conns[key] # MyConns
def __repr__(self): return f"<MyConns host={self.host}:{self.port}>" # MyConns
class MyManager: # MyManager
def __init__(self): self.connss = {} # MyManager
def __call__(self, host, port, user, password): # MyManager
key = (host, port, user, password) # MyManager
if key not in self.connss: self.connss[key] = MyConns(host, port, user, password) # MyManager
return self.connss[key] # MyManager
sqlite3 = k1.dep("sqlite3"); # MyManager
class LiConn: # LiConn
def __init__(self, conn, fn): self.conn = conn; self.fn = fn; self._lastrowid = None; self.lock = threading.Lock() # LiConn
def queryRaw(self, f): # LiConn
with self.lock: cur = self.conn.cursor(); res = f(cur); self.conn.commit(); cur.close(); return res # LiConn
def query(self, query, *args, mode=0): # LiConn
def inner(cur): # LiConn
res = None; desc = None; cur.execute(query, args); res = cur.fetchall() # LiConn
if mode > 0: desc = [x[0] for x in cur.description] # LiConn
try: self._lastrowid = cur.lastrowid # LiConn
except: pass # LiConn
return res if mode == 0 else ((res, desc) if mode == 1 else [desc, *res]) # LiConn
return self.queryRaw(inner) # LiConn
def __repr__(self): return f"<LiConn fn={self.fn} db='default'>" # LiConn
class LiConns: # LiConns
def __init__(self, fn:str): self.conn = LiConn(sqlite3.connect(fn, check_same_thread=False), fn); self.fn = fn # LiConns
def __getitem__(self, key:str): return self.conn # LiConns
def __repr__(self): return f"<LiConns fn={self.fn}>" # LiConns
class LiManager: # LiManager
def __init__(self): self.connss = {} # LiManager
def __call__(self, host:str, port, user, password): # LiManager
if host == ":memory:": return LiConns(host) # no caching, cause each instance is different! # LiManager
host = os.path.expanduser(host) # LiManager
if host not in self.connss: self.connss[host] = LiConns(host) # LiManager
return self.connss[host] # LiManager
pgM = PgManager(); myM = MyManager(); liM = LiManager() # LiManager
settings = k1lib.settings # LiManager
@contextmanager # LiManager
def mysqlCnf(user, password, host, port): # mysqlCnf
fn = f"""[client]\nuser = "{user}"\npassword = "{password or ''}"\nhost = "{host}"\nport = "{port}" """ | cli.file() # mysqlCnf
try: yield fn # mysqlCnf
finally: os.remove(fn) # mysqlCnf
rowCaches = set() # k1lib.cache objects for caching table[id] # mysqlCnf
def cb_rowCache_size(s,v): # cb_rowCache_size
for rC in rowCaches: rC.maxsize = v # cb_rowCache_size
def cb_rowCache_timeout(s, v): # cb_rowCache_timeout
for rC in rowCaches: rC.timeout = v # cb_rowCache_timeout
settings.cred.add("sql", k1lib.Settings() # cb_rowCache_timeout
.add("mode", "pg", env="K1_SQL_MODE") # cb_rowCache_timeout
.add("host", "127.0.0.1", "host name of db server, or db file name if mode='lite'. Warning: mysql's mysqldump won't resolve domain names, so it's best to pass in ip addresses", env="K1_SQL_HOST") # cb_rowCache_timeout
.add("port", 3306, env="K1_SQL_PORT") # cb_rowCache_timeout
.add("user", "admin", sensitive=True, env="K1_SQL_USER") # cb_rowCache_timeout
.add("password", "admin", sensitive=True, env="K1_SQL_PASSWORD") # cb_rowCache_timeout
.add("verbose", False, "if True, will print out all executed queries") # cb_rowCache_timeout
.add("sanitize", False, "if True, will sanitize all string columns with MarkupSafe", env=("K1_SQL_SANITIZE", lambda x: x.lower()[0] == "t")) # cb_rowCache_timeout
.add("cache", k1lib.Settings() # cb_rowCache_timeout
.add("size", 3000, "Size of the cache, 0 to disable it", cb_rowCache_size, env=("K1_SQL_CACHE_SIZE", lambda x: int(x))) # cb_rowCache_timeout
.add("timeout", 1, "After this number of seconds, the cached item will expire", cb_rowCache_timeout, env=("K1_SQL_CACHE_TIMEOUT", lambda x: float(x))), "Cache settings for table accesses. I.e table[id]") # cb_rowCache_timeout
, "anything related to sql, used by k1lib.cli.lsext.sql. See docs for that class for more details") # cb_rowCache_timeout
markupsafe = k1.dep("markupsafe", "MarkupSafe", "https://pypi.org/project/MarkupSafe/") # cb_rowCache_timeout
default = object(); qD = {"my": "`", "pg": "", "lite": ""} # quote dict # cb_rowCache_timeout
def stripMemView(x): return bytes(x) if isinstance(x, memoryview) else x # postgresql returns memory view objects for bytea columns, instead of just raw bytes, so gotta strip it # stripMemView
class alldump(list): pass # dump of all databases # alldump
[docs]
class sql: # sql
[docs]
def __init__(self, host=default, port=default, user=default, password=default, mode=default): # sql
"""Creates a connection to a SQL database.
Example::
s = sql("127.0.0.1") # creates a new sql object. Apparently, mysql and mysqldump on the command line bugs out if you put "localhost". If you put localhost here, it should work fine, but `sql(...) | toBytes()` and other functions that use external programs can deny you access
s.refresh() # refreshes connection any time you encounter strange bugs
s | ls() # returns List[sqldb], lists out all databases
s | toBytes() # returns Iterator[str], dumps every databases. Yes, it's "toBytes", but it has the feeling of serializing whatever the input is, so it's at least intuitive in that way
"dump.sql" | s # restores the database using the dump file
cat("dump.sql") | s # restores the database using the dump file
db1 = s["db1"] # returns sqldb, gets database named "db1"
db1 | ls() # returns List[sqltable], list out all tables within this database
db1 | toBytes() # returns Iterator[str], dumps the database
users = db1["user"] # gets table named "user", short and simple
db1.query("select * from users") # queries the database using your custom query
db1.query("select * from users where user_id=%s", 3) # queries with prepared statement
users.info() # prints out the first 10 rows of the table and the table schema
users.cols # returns table's columns as List[str]
len(users) # returns number of rows
users.query(...) # can also do a custom query, just like with databases
users[1] # grabs user with id 1. Returns None if user doesn't exist
users[1:10] # grabs users with id from 1 (inclusive) to 10 (exclusive)
users[1].firstname = "Reimu" # sets the first name of user with id 1 to "Reimu"
users[1] = {"firstname": "Reimu", "lastname": "Hakurei"} # sets first name and last name of user with id 1
for user in users: user.firstname = "Reimu" # loops through all users and change the firstname to "Reimu"
users.insert(firstname="Yuyuko", lastname="Saigyouji") # inserts a new row
users.insertBulk(firstname=["Yuyuko", "Marisa"], lastname=["Saigyouji", "Kirisame"]) # inserts multiple rows at once
users | toBytes() # dumps this specific table, returns Iterator[str]
users | cat() | display() # reads entire table, gets first 10 rows and displays it out
users | (cat() | head(20)) | display() # reads first 20 rows only, then displays the first 10 rows. Query sent is "select * from user limit 20"
users | (cat() | filt("x == 4", 3) | head(20)) | display() # grabs first 20 rows that has the 4th column equal to 4, then displays the first 10 rows. Query sent is "select user_id, address, balance, age from user where age = 4", assuming the table only has those columns
Philosophy for these methods is that they should be intuitive to look at and interact with, not for performance.
If performance is needed, just write raw queries or don't use a database and use applyCl() instead.
If any of the params here are not specified, they will take on the values from
"settings.cred.sql", which can be initialized from environment variables.
Execute and display/print "settings" in a new cell to see all of them.
Create table commands for different engines, for quick reference.
Postgresql::
\"\"\"
CREATE TABLE users (
id bigserial primary key,
name VARCHAR(50),
age INT,
time BIGINT,
groupIds BIGINT[],
data JSON
);\"\"\"
Sqlite::
\"\"\"
CREATE TABLE users (
id INTEGER primary key autoincrement,
name VARCHAR(50),
age INT,
time BIGINT,
groupIds BIGINT[],
data JSON
);\"\"\"
:param host: host name, ip address, or file name (in case of sqlite)
:param port: port at the host
:param uesr: database user name. If not specified then fallback to environment variable ``SQL_USER``, then ``USER``
:param password: database password. If not specified then assume database doesn't require one
:param mode: currently supports 3 values: "my" (MySQL), "pg" (PostgreSQL) and "lite" (SQLite)""" # sql
host = settings.cred.sql.host if host is default else host; port = settings.cred.sql.port if port is default else port; user = settings.cred.sql.user if user is default else user # sql
password = settings.cred.sql.password if password is default else password; mode = settings.cred.sql.mode if mode is default else mode # sql
if mode not in ("my", "pg", "lite"): raise Exception(f"Supports only 'my' (MySQL), 'pg' (PostgreSQL) and 'lite' (SQLite) for now, don't support {mode}") # sql
self.host = host; self.port = port; self.user = user or os.environ.get("SQL_USER") or os.environ.get("USER") # sql
self.password = password or os.environ.get("SQL_PASSWORD"); self.mode = mode; self.star = "%s" if mode == "my" or mode == "pg" else "?" # sql
self.manager = {"pg": pgM, "my": myM, "lite": liM}[mode]; self.dbs = None; self.dbsL = None # lower case version # sql
self.conn = self.manager(self.host, self.port, self.user, self.password)[None] # sql
[docs]
def query(self, query, *args): # sql
"""Executes a query. Returns the resulting table (if select query).
Example::
s.query("insert into users (name, age) values (%s, %s)", "Reimu", 25)
""" # sql
return self.conn.query(query, *args) # sql
@k1.cache(timeout=10, maxsize=1000, name="sql.ls", docs="Caches result of listing out all databases") # sql
def _ls(self): # sql
if self.mode == "my": self.dbs = {e[0]: sqldb(self, e[0]) for e in self.query("show databases")} # sql
elif self.mode == "pg": self.dbs = {e[0]: sqldb(self, e[0]) for e in self.query("select datname from pg_database where datistemplate=false")} # sql
elif self.mode == "lite": self.dbs = {"default": sqldb(self, "default")} # sql
self.dbsL = {k.lower():v for k,v in self.dbs.items()} # sql
return list(self.dbs.values()) # sql
def __repr__(self): return f"<sql mode={self.mode} host={self._host}>" # sql
@property # sql
def _host(self) -> str: # sql
if self.mode == "lite": return self.host.split(os.sep)[-1] # sql
else: return f"{self.host}:{self.port}" # sql
def _cnfCtx(self): return mysqlCnf(self.user, self.password, self.host, self.port) # sql
def _toBytes(self): # sql
if self.mode == "my": # sql
with self._cnfCtx() as fn: k1lib.depCli("mysqldump"); return alldump(None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob --all-databases")) # sql
elif self.mode == "pg": k1lib.depCli("pg_dumpall"); return alldump(None | cli.cmd(f"PGPASSWORD={self.password} pg_dumpall -h {self.host} -p {self.port} -U {self.user}")) # sql
else: raise Exception(f"All databases dump of mode {self.mode} is not supported yet") # sql
[docs]
def __ror__(self, it): # restoring a backup # sql
if self.mode == "my": # sql
def restore(fn): # sql
k1lib.depCli("mysql") # sql
with self._cnfCtx() as cnfFn: None | cli.cmd(f"mysql --defaults-file={cnfFn} < {fn}") | cli.ignore() # sql
if isinstance(it, str): restore(it) # sql
else: fn = it | cli.file(); restore(fn); os.remove(fn) # sql
elif self.mode == "pg": # sql
if isinstance(it, str): fn = it; isStr = True # sql
elif not isinstance(it, alldump): raise Exception("The sql commands piped in is from a specific database, while you haven't chosen a particular database from this sql connection yet. Do something like `s = sql(...); db = s['some_db_name']; ['some sql'] | db`, instead of `['some sql'] | s` like you're doing now") # sql
else: fn = it | cli.file(); isStr = False # sql
try: None | cli.cmd(f"PGPASSWORD={self.password} psql -h {self.host} -p {self.port} -U {self.user} -d postgres < {fn}") | cli.ignore() # sql
finally: # sql
if not isStr: os.remove(fn) # sql
else: raise Exception(f"Restoring database from .sql file of mode {self.mode} is not supported yet") # sql
def __len__(self): return len(self._ls()) # sql
def __getitem__(self, idx): # sql
self._ls() # sql
if idx in self.dbs: return self.dbs[idx] # sql
if idx.lower() in self.dbsL: return self.dbsL[idx.lower()] # sql
raise Exception(f"Database '{idx}' does not exist, can't retrieve it") # sql
def __iter__(self): return iter(self._ls()) # sql
def __delitem__(self, idx): # sql
if isinstance(idx, str): # sql
db = self[idx] # sql
if self.mode == "my": return self.query(f"drop database {idx}") # sql
elif self.mode == "pg": return self.query(f"drop database {idx} with (force)") # sql
else: raise Exception("Currently only support dropping databases in mysql and postgresql") # sql
else: raise Exception("Only supports deleting with table names. Use this like `del db['some_table']`") # sql
class dbdump(list): pass # dbdump
[docs]
class sqldb: # sqldb
[docs]
def __init__(self, sql:sql, name:str): # sqldb
"""A sql database representation. Not expected to be instatiated by you. See also: :class:`sql`""" # sqldb
self.sql = sql; self.name = name; self.tbls = {}; self.tblsL = {}; s = sql; self.conn = s.manager(s.host, s.port, s.user, s.password)[name] # sqldb
[docs]
def query(self, query, *args): # sqldb
"""Executes a query in this database. Returns the resulting table (if select query).
Example::
db = ...
db.query("insert into users (name, age) values (%s, %s)", "Reimu", 25)
""" # sqldb
return self.conn.query(query, *args) # sqldb
@k1.cache(timeout=10, maxsize=1000, name="sqldb.ls", docs="Caches result of listing out all tables") # sqldb
def _ls(self): # sqldb
if self.sql.mode == "my": self.tbls = {e[0]: sqltable(self.sql, self, e[0]) for e in self.query(f"show tables")} # sqldb
if self.sql.mode == "pg": self.tbls = {e[0]: sqltable(self.sql, self, e[0]) for e in self.query(f"select table_name from information_schema.tables where table_schema = 'public'")} # sqldb
if self.sql.mode == "lite": self.tbls = {e[0]: sqltable(self.sql, self, e[0]) for e in self.query("select name from sqlite_master where type='table'")} # sqldb
self.tblsL = {k.lower():v for k,v in self.tbls.items()}; return list(self.tbls.values()) # sqldb
def __repr__(self): return f"<sqldb host={self.sql._host} db={self.name}>" # sqldb
def _toBytes(self): # sqldb
if self.sql.mode == "my": # sqldb
with self.sql._cnfCtx() as fn: return dbdump(None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob --databases {self.name}")) # sqldb
elif self.sql.mode == "pg": return dbdump(None | cli.cmd(f"PGPASSWORD={self.sql.password} pg_dump -h {self.sql.host} -p {self.sql.port} -U {self.sql.user} -d {self.name}")) # sqldb
else: raise Exception(f"Database dump of mode {self.sql.mode} is not supported yet") # sqldb
[docs]
def __ror__(self, it): # sqldb
mode = self.sql.mode; q = qD[mode] # sqldb
if mode == "my": # sqldb
if isinstance(it, str): # sqldb
with open(it, "r") as f: it = f.readlines() # loads file to ram # sqldb
return self.sql.__ror__([f"USE {q}{self.name}{q};", *it]) # sqldb
elif mode == "pg": # sqldb
if isinstance(it, str): fn = it; isStr = True # sqldb
elif isinstance(it, alldump): self.sql.__ror__(it); return # sqldb
else: fn = it | cli.file(); isStr = False # sqldb
try: None | cli.cmd(f"PGPASSWORD={self.sql.password} psql -h {self.sql.host} -p {self.sql.port} -U {self.sql.user} -d {self.name} < {fn}") | cli.ignore() # sqldb
finally: # sqldb
if not isStr: os.remove(fn) # sqldb
def __len__(self): return len(self._ls()) # sqldb
def __getitem__(self, idx): # sqldb
res = self._ls() # sqldb
if isinstance(idx, str): # sqldb
if idx in self.tbls: return self.tbls[idx] # sqldb
if idx.lower() in self.tblsL: return self.tblsL[idx.lower()] # sqldb
else: return res[idx] # sqldb
def __iter__(self): return iter(self._ls()) # sqldb
def __delitem__(self, idx): # sqldb
if isinstance(idx, str): # sqldb
if self.sql.mode == "pg": idx = idx.lower() # sqldb
lsres = [x for x in self._ls() if x.name == idx] # sqldb
if len(lsres) == 1: return self.query(f"drop table {idx}") # sqldb
else: raise Exception(f"Table '{idx}' does not exist, can't delete it") # sqldb
else: raise Exception("Only supports deleting with table names. Use this like `del db['some_table']`") # sqldb
class tbldump(list): pass # tbldump
[docs]
class sqltable: # sqltable
[docs]
def __init__(self, sql, sqldb, name:str): # sqltable
"""A sql table representation. Not expected to be instantiated by you. See also: :class:`sql`""" # sqltable
self.sql = sql; self.sqldb = sqldb; self.name = name; self._cols = None; self.__col2Type = None # sqltable
def _col2Type(self): # sqltable
if self.__col2Type == None: mode = self.sql.mode; i1,i2 = [1,2] if mode == "lite" else [0,1]; self.__col2Type = {row[i1]: row[i2].lower() for row in self._describe()[1:]} # sqltable
return self.__col2Type # sqltable
def _cat(self, ser): # sqltable
cols = self.cols; _2 = [] # clis that can't be optimized, stashed away to be merged with ser later on # sqltable
q = qD[self.sql.mode]; clis = deque(ser.clis); o1 = None; o2 = None; o3 = [] # cut(), head() and filt() opts # sqltable
while len(clis) > 0: # sqltable
c = clis.popleft() # sqltable
if isinstance(c, cli.filt): _2.append(c); break # TODO: add optimizations for filt # sqltable
elif o2 is None and isinstance(c, cli.head): # sqltable
if round(c.n) != c.n or c.n < 0 or c.inverted or c.n == None: _2.append(c); break # sqltable
else: o2 = f"limit {c.n}"; continue # sqltable
elif o1 is None and isinstance(c, cli.cut): # sqltable
if isinstance(c.columns, slice): _2.append(c); o1 = 0; continue # sqltable
else: # sqltable
o1 = ", ".join([f"{q}{c}{q}" for c in cols | cli.rows(*c.columns)]) # sqltable
if len(c.columns) == 1: _2.append(cli.item().all() | cli.aS(list)) # sqltable
else: _2.append(c); break # sqltable
o1 = o1 or ", ".join([f"{q}{c}{q}" for c in cols]) # sqltable
query = f"select {o1} from {q}{self.name}{q} {o2 or ''}"#; print(f"query: {query}"); return [] # sqltable
sql = self.sql; return [sqlrow(sql, self, row) for row in self.sqldb.query(query) | cli.serial(*_2, *clis)] # sqltable
@property # sqltable
def cols(self): # sqltable
"""Get column names.
Example::
db.cols # returns List[str], like ["id", "userId", "name", ...]
""" # sqltable
if not self._cols: self._cols = self._describe()[1:] | cli.cut({"my": 0, "pg": 0, "lite": 1}[self.sql.mode]) | cli.deref() # sqltable
return self._cols # sqltable
@lru_cache # sqltable
def _describe(self): # sqltable
if self.sql.mode == "my": return self.sqldb.query(f"describe `{self.name}`") | cli.insert(["Field", "Type", "Null", "Key", "Default", "Extra"]) | cli.deref() # sqltable
if self.sql.mode == "pg": return self.sqldb.query(f"select column_name, data_type, is_nullable, column_default, ordinal_position from information_schema.columns where table_name='{self.name}' order by ordinal_position") | cli.insert(["column_name", "data_type", "is_nullable", "column_default", "ordinal_position"]) | cli.deref() # sqltable
if self.sql.mode == "lite": return self.sqldb.query(f"pragma table_info([{self.name}])") | cli.insert(["cid", "name", "type", "notnull", "dflt_value", "pk"]) | cli.deref() # sqltable
[docs]
def insert(self, **kwargs): # sqltable
"""Inserts a row.
Example::
table = ...
table.insert(firstname="Yuyuko", lastname="Saigyouji")
""" # sqltable
q = qD[self.sql.mode]; keys = ", ".join([f"{q}{x}{q}" for x in kwargs.keys()]); values = ", ".join([self.sql.star]*len(kwargs)) # sqltable
mode = self.sql.mode; san = settings.cred.sql.sanitize; vs = []; _typeD = self._col2Type() # sqltable
for k, v in kwargs.items(): # sqltable
if isinstance(v, dict) or _typeD.get(k, "") == "json": vs.append(json.dumps(v)) # sqltable
elif isinstance(v, (set, tuple)) or _typeD.get(k, "") == "array" or _typeD.get(k, "").endswith("[]"): vs.append((None if v is None else list(v)) if mode == "pg" else json.dumps(list(v))) # sqltable
elif san and isinstance(v, str): vs.append(markupsafe.escape(v)) # sqltable
else: vs.append(v) # sqltable
if mode == "my" or mode == "lite": # sqltable
self.query(f"insert into {self.name} ({keys}) values ({values})", *vs) # sqltable
if mode == "my": return self[self.query("select last_insert_id()")[0][0]] # sqltable
if mode == "lite": return self[self.sql.conn._lastrowid] # sqltable
elif mode == "pg": return self[self.query(f"insert into {self.name} ({keys}) values ({values}) returning id", *vs)[0][0]] # sqltable
[docs]
def insertBulk(self, **kwargs): # sqltable
"""Inserts a row.
Example::
table = ...
table.insertBulk(firstname=["Yuyuko", "Marisa"], lastname=["Saigyouji", "Kirisame"])
""" # sqltable
s = set([len(x) for x in kwargs.values()]) # sqltable
if len(s) == 0: return # sqltable
if len(s) != 1: raise Exception("Different parameters have different number of rows!") # sqltable
q = qD[self.sql.mode]; keys = ", ".join([f"{q}{x}{q}" for x in kwargs.keys()]); values = ", ".join([self.sql.star]*len(kwargs)) # sqltable
mode = self.sql.mode; san = settings.cred.sql.sanitize; vs = []; _typeD = self._col2Type() # sqltable
for k, v in kwargs.items(): # sqltable
if isinstance(v[0], dict) or _typeD.get(k, "") == "json": vs.append([json.dumps(x) for x in v]) # sqltable
elif isinstance(v[0], (set, tuple)) or _typeD.get(k, "") == "array" or _typeD.get(k, "").endswith("[]"): vs.append((None if v is None else list(v)) if mode == "pg" else json.dumps(list(v))) # sqltable
elif san and isinstance(v[0], str): vs.append([markupsafe.escape(x) for x in v]) # sqltable
else: vs.append(v) # sqltable
return self.sqldb.conn.queryRaw(lambda cur: cur.execute(f"insert into {self.name} ({keys}) values ".encode() + b','.join(cur.mogrify("("+",".join(["%s"]*len(kwargs))+")", x) for x in list(vs | cli.T())))) # sqltable
def _update(self, _idx, **kwargs): # sqltable
"""Updates a row.
Example::
table = ...
table.update(3, firstname="Youmu", lastname="Konpaku")
""" # sqltable
q = qD[self.sql.mode]; p1 = ", ".join([f"{q}{k}{q} = {self.sql.star}" for k in kwargs.keys()]) # sqltable
mode = self.sql.mode; san = settings.cred.sql.sanitize; values = []; _typeD = self._col2Type() # sqltable
for k, v in kwargs.items(): # sqltable
if isinstance(v, dict) or _typeD.get(k, "") == "json": values.append(json.dumps(v)) # sqltable
elif isinstance(v, (set, tuple)) or _typeD.get(k, "") == "array" or _typeD.get(k, "").endswith("[]"): values.append((None if v is None else list(v)) if mode == "pg" else json.dumps(list(v))) # sqltable
elif san and isinstance(v, str): values.append(markupsafe.escape(v)) # sqltable
else: values.append(v) # sqltable
self.query(f"update {self.name} set {p1} where {self.idCol} = {_idx}", *values) # sqltable
[docs]
def info(self, out=False): # sqltable
"""Preview table.
Example::
table = ...
table.info()
:param out: if True, returns a list of lines instead of printing them out""" # sqltable
def gen(): # sqltable
desc = self._describe() | cli.deref(); cols = self.cols; mode = self.sql.mode; q = qD[mode]; s = ", ".join([f"{q}{e}{q}" for e in cols]) # sqltable
if mode == "my": # sqltable
status = self.sqldb.query(f"show table status like '{self.name}'")[0] # sqltable
status = f"engine:{status[1]}, #rows(approx):{status[4]}, len(row):{status[5]}, updated:{status[12]}UTC, created:{status[11]}UTC" # sqltable
elif mode == "lite": # sqltable
nrows = self.query(f"select max(rowid) from {q}{self.name}{q}")[0][0] # sqltable
try: ncols = len(self.query(f"select * from {q}{self.name}{q} limit 1")[0]) # sqltable
except: ncols = "?" # sqltable
status = f"#rows(approx):{nrows}, len(row):{ncols}" # sqltable
else: nrows = self.sqldb.query(f"select count(*) from {q}{self.name}{q}")[0][0]; status = f"{nrows} rows total" # sqltable
idCol = "rowid" if mode == "lite" else self.idCol; print(f"Table `{self.name}` ({status})\n"); self.sqldb.query(f"select {s} from {q}{self.name}{q} order by {idCol} limit 9") | (cli.aS(stripMemView) | cli.aS(repr) | cli.head(50)).all(2) | cli.insert(cols) | cli.display() # sqltable
try: # print tails # sqltable
print("...\n..."); self.sqldb.query(f"select {s} from {q}{self.name}{q} order by {idCol} desc limit 9")\
| cli.reverse() | (cli.aS(stripMemView) | cli.aS(repr) | cli.head(50)).all(2) | cli.insert(cols) | cli.display() # sqltable
except: pass # sqltable
print("\nTable format:"); desc | cli.display(None) # sqltable
if mode == "my": print("\nIndexes:"); self.sqldb.query(f"show indexes from {q}{self.name}{q}", mode=2) | cli.display() # sqltable
elif mode == "pg": print("\nIndexes:"); self.sqldb.query(f"SELECT tablename, indexname, indexdef FROM pg_indexes WHERE schemaname = 'public' and tablename = '{self.name}'") | cli.display() # sqltable
if out: # sqltable
with k1.captureStdout() as out: gen() # sqltable
return out() # sqltable
else: gen() # sqltable
def _ls(self): return self.info() # sqltable
[docs]
def query(self, query, *args): # sqltable
"""Executes a query in this table's database. Returns the resulting table (if select query).
Example::
table = ...
table.query("insert into users (name, age) values (%s, %s)", "Reimu", 25)
""" # sqltable
return self.sqldb.query(query, *args) # sqltable
[docs]
def queryDesc(self, query, *args): return self.sqldb.queryDesc(query, *args) # sqltable
[docs]
def select(self, query, *args): # sqltable
"""Pretty much identical to :meth:`query`, but this postprocess the results
a little bit, and package them into :class:`sqlrow` objects. Example::
table = ...
table.select("where name = %s", "Reimu")
""" # sqltable
lq = query.strip().lower() # sqltable
if lq.startswith("select") and not lq.startswith("select *"): raise Exception(".select() requires the query to start with 'select *', or remove 'select * from table' statement entirely!") # sqltable
if not lq.startswith("select"): query = f"select * from {self.name} {query}" # sqltable
query = query.replace("select *", f"select {self.scols}").replace("SELECT *", f"select {self.scols}") # sqltable
res = self.query(query, *args); sql = self.sql; return [sqlrow(sql, self, e) for e in res] # sqltable
def __repr__(self): return f"<sqltable host={self.sql._host} db={self.sqldb.name} table={self.name}>" # sqltable
def _toBytes(self): # sqltable
if self.sql.mode == "my": # sqltable
with self.sql._cnfCtx() as fn: return tbldump(None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob {self.sqldb.name} {self.name}")) # sqltable
elif self.sql.mode == "pg": return tbldump(None | cli.cmd(f"PGPASSWORD={self.sql.password} pg_dump -h {self.sql.host} -p {self.sql.port} -U {self.sql.user} -d {self.sqldb.name} -t {self.name}")) # sqltable
else: raise Exception(f"Table dump of mode {self.sql.mode} is not supported yet") # sqltable
[docs]
def __ror__(self, it): return self.sqldb.__ror__(it) # sqltable
def __len__(self): return self.query(f"select count(*) from {self.name}")[0][0] # sqltable
def __iter__(self): return self.select("select * from valves") # sqltable
@property # sqltable
def scols(self): q = qD[self.sql.mode]; return ", ".join([f"{q}{c}{q}" for c in self.cols]) # string columns # sqltable
[docs]
def lookup(self, **kwargs): # sqltable
"""Convenience function to lookup 1 instance with the specified value.
Example::
user = users.lookup(firstname="Reimu")
# multiple columns work too
user = users.lookup(firstname="Reimu", lastname="Hakurei")
""" # sqltable
p1 = " AND ".join([f"{k} = {self.sql.star}" for k in kwargs.keys()]) # sqltable
res = self.query(f"select {self.scols} from {self.name} where {p1} limit 1", *kwargs.values()) # sqltable
return None if len(res) == 0 else sqlrow(self.sql, self, res[0]) # sqltable
@property # sqltable
def idCol(self): # sqltable
if self.sql.mode == "pg": # cause postgresql kinda don't respect column orders, so if I were to define "id" as the first column, it wouldn't work reliably. So this will just searches for a column called "id". If found, then uses that for indexing operations, else use the first column like my or lite # sqltable
return "id" if len([x for x in self.cols if x == "id"]) > 0 else self.cols[0] # sqltable
else: return self.cols[0] # sqltable
@property # sqltable
def idColPos(self): idCol = self.idCol; return [i for i,x in enumerate(self.cols) if x == idCol][0] if self.sql.mode == "pg" else 0 # sqltable
_cache = k1.cache(settings.cred.sql.cache.size, settings.cred.sql.cache.timeout, name="sqltable.getitem", docs="Caches result of table[idx]"); rowCaches.add(_cache) # sqltable
@_cache # sqltable
def __getitem__(self, idx): # sqltable
mode = self.sql.mode; idCol = self.idCol; idColPos = self.idColPos # sqltable
q = qD[mode]; scols = ", ".join([f"{q}{c}{q}" for c in self.cols]) # sqltable
if idx is None: return None # sqltable
elif isinstance(idx, (int, str)): # sqltable
res = self.query(f"select {scols} from {self.name} where {idCol} = {self.sql.star}", idx) # sqltable
if len(res) == 0: return None # sqltable
return sqlrow(self.sql, self, res[0]) # sqltable
elif isinstance(idx, slice): # sqltable
idxs = list(range(idx.stop if idx.stop is not None else (self.query(f"select max({idCol}) from {self.name}")[0][0]+1))[idx]) # sqltable
if len(idxs) == 0: return [] # sqltable
res = self.query(f"select {scols} from {self.name} where {idCol} in ({', '.join([str(x) for x in idxs])})") # sqltable
d = {row[idColPos]:row for row in res}; sql = self.sql; return [sqlrow(sql, self, d[idx]) if d.get(idx, None) else None for idx in idxs] # sqltable
else: # sqltable
idxs = list(idx) # sqltable
if all([isinstance(e, int) for e in idxs]): # does not support string because that might lead to sql injection. Can fix, but too lazy # sqltable
if len(idxs) == 0: return [] # sqltable
res = self.query(f"select {scols} from {self.name} where {idCol} in ({', '.join([str(e) for e in idx])})") # sqltable
d = {row[idColPos]:row for row in res}; sql = self.sql; return [sqlrow(sql, self, d[idx]) if d.get(idx, None) else None for idx in idxs] # sqltable
raise Exception("Only support table indexing of integers or strings or slices or list[int]") # sqltable
def __setitem__(self, idx, value): # sqltable
if not isinstance(value, dict): raise Exception("Only accepts setting elements with dicts") # sqltable
if self[idx] is None: raise Exception(f"Can't set element with id {idx}, it doesn't seem to exist. Use table.insert(col1=value1, col2=value2) instead") # sqltable
self._update(idx, **value) # sqltable
def __iter__(self): return iter(self | cli.cat()) # sqltable
def __delitem__(self, idx): # sqltable
idCol = self.idCol; idColPos = self.idColPos # sqltable
if isinstance(idx, (int, str)): # sqltable
res = self[idx] # sqltable
if res: self.query(f"delete from {self.name} where {idCol} = {self.sql.star}", res[idColPos]) # sqltable
elif isinstance(idx, slice): # sqltable
res = self[idx]; ids = ", ".join([row[idColPos] for row in res if row]) # sqltable
if len(res) > 0: self.query(f"delete from {self.name} where {idCol} in ({ids})") # sqltable
else: raise Exception("Only support table indexing of integers or strings or slices") # sqltable
[docs]
class sqlrow: # sqlrow
def __init__(self, sql, sqltable, row): # sqlrow
row = [stripMemView(x) for x in row]; self._ab_sentinel = True; self._sql = sql; self._sqltable = sqltable # sqlrow
self._row = row; _typeD = sqltable._col2Type(); self._data = {} # sqlrow
if sql.mode == "lite": # sqlrow
for k, v in zip(sqltable.cols, row): _type = _typeD.get(k, ""); self._data[k] = json.loads(v) if _type.endswith("[]") or _type == "json" else v # sqlrow
else: self._data = {k:v for k,v in zip(sqltable.cols, row)} # sqlrow
self._ab_sentinel = False # sqlrow
# if sql.mode == "pg": self.__dict__.update({k.lower():v for k,v in zip(sqltable.cols, row)}) # add case insensitive cases # sqlrow
def __getitem__(self, idx): return self._row[idx] # sqlrow
def __getattr__(self, attr): # sqlrow
if attr in self._data: return self._data[attr] # sqlrow
if attr.lower() in self._data and self._sql.mode == "pg": return self._data[attr.lower()] # sqlrow
return self.__dict__[attr] # sqlrow
def __setattr__(self, attr, value): # sqlrow
if attr == "_ab_sentinel": self.__dict__[attr] = value # sqlrow
else: # sqlrow
if self._ab_sentinel: self.__dict__[attr] = value; return # sqlrow
attr = attr.lower() if self._sql.mode == "pg" else attr # sqlrow
if attr not in self._sqltable.cols: self.__dict__[attr] = value # sqlrow
else: # sqlrow
self._sqltable._update(self._row[self._sqltable.idColPos], **{attr: value}); self.__dict__[attr] = value # sqlrow
idx = {x:y for x,y in zip(self._sqltable.cols, range(len(self._row)))}[attr] # sqlrow
row = list(self._row); row[idx] = value; self._row = row; self._data[attr] = value # sqlrow
[docs]
def json(self): return self._data # sqlrow
def __len__(self): return len(self._row) # sqlrow
def __repr__(self): # sqlrow
ans = [] # sqlrow
for k, elem in self._data.items(): # sqlrow
if isinstance(elem, str): ans.append(f"{k}=({len(elem)} len) {json.dumps(elem[:100])}..." if len(elem) > 100 else f"{k}=({len(elem)} len) {json.dumps(elem)}") # sqlrow
elif isinstance(elem, bytes): ans.append(f"{k}=({len(elem)} len) {elem[:100]}..." if len(elem) > 100 else f"{k}=({len(elem)} len) {elem}") # sqlrow
else: ans.append(f"{k}={elem}") # sqlrow
return "(" + f", ".join(ans) + ")" # sqlrow
settings.cli.atomic.baseAnd = (*settings.cli.atomic.baseAnd, sql, sqldb, sqltable, sqlrow) # sqlrow
boto3 = k1.dep("boto3") # if below so that the tests would run since I have loaded the creds from my startup.py # sqlrow
if "minio" not in settings.cred.__dict__: settings.cred.add("minio", k1lib.Settings().add("host", "https://localhost:9000", env="K1_MINIO_HOST").add("access_key", "", sensitive=True, env="K1_MINIO_ACCESS_KEY").add("secret_key", "", sensitive=True, env="K1_MINIO_SECRET_KEY"), "anything related to minio buckets, used by k1lib.cli.lsext.minio") # sqlrow
[docs]
def minio(host=None, access_key=None, secret_key=None) -> "s3": # minio
"""Convenience function that constructs a :class:`s3` object but focused on minio.
If the params are not specified, then it takes on the values in the settings
"settings.cred.minio". Example::
s = minio() # returns s3 instance
s | ls() # list all buckets, all other normal operations
:param host: looks like "http://localhost:9000"
""" # minio
return s3(boto3.client('s3', # minio
endpoint_url=host or settings.cred.minio.host, # minio
aws_access_key_id=access_key or settings.cred.minio.access_key, # minio
aws_secret_access_key=secret_key or settings.cred.minio.secret_key, # minio
region_name='us-west-2')) # minio
[docs]
class s3: # s3
[docs]
def __init__(self, client): # s3
"""Represents an S3 client.
Example::
client = boto3.client("s3", ...) # put your credentials and details here
db = s3(client) # creates an S3 manager
db | ls() # lists all buckets accessible
bucket = db | ls() | item() # grabs the first bucket, returns object of type s3bucket
bucket = db["bucket-name"] # or you can instantiate the bucket directly
bucket | ls() # lists all objects within this bucket
bucket | ls() | grep("\\.so") # grabs all .so files from the bucket
obj = bucket | ls() | item() # grabs the first object within this bucket, returns object of type s3obj
obj = bucket["some_key"] # or, grab a specific object
obj.key, obj.size, obj.lastModified # some fields directly accessible
del bucket["some_key"] # deletes the object from the bucket
obj = "abc\\ndef" | bucket.upload("somekey") # uploads a file by piping the contents in, returns s3obj
obj = b"abc\\ndef"| bucket.upload() # same as above, but with an auto-generated key
This mostly offers interoperability with ls() and cat(), so that you can
write relatively intuitive code, but fundamentally provides no upsides
:param client: boto3 client""" # s3
self.client = client; self._cachedLs = None # s3
def _ls(self): self._cachedLs = {x["Name"]:s3bucket(self.client, x["Name"]) for x in self.client.list_buckets()["Buckets"]}; return list(self._cachedLs.values()) # s3
def __len__(self): # s3
if self._cachedLs is None: self._ls() # s3
return len(self._cachedLs) # s3
def __getitem__(self, name): # s3
if self._cachedLs is None: self._ls() # s3
if name not in self._cachedLs: self._ls() # s3
if name not in self._cachedLs: raise Exception(f"Bucket '{name}' doesn't exist") # s3
return self._cachedLs[name] # s3
def __repr__(self): return f"<kaws.s3 client>" # s3
_s3bucketUploader_autoInc = k1lib.AutoIncrement(prefix=f"_key_{round(time.time()*1000)}_") # s3
class s3bucketUploader(cli.BaseCli): # s3bucketUploader
def __init__(self, client, bucket:"s3bucket", key:str): # s3bucketUploader
"""Uploads the data piped in, uploads, then return a :class:`s3obj`""" # s3bucketUploader
self.client = client; self.bucket = bucket; self.key = key # s3bucketUploader
def __ror__(self, it): # s3bucketUploader
if isinstance(it, (bytes, str)): self.client.put_object(Bucket=self.bucket.name, Key=self.key, Body=(it if isinstance(it, bytes) else it.encode())) # first path is more direct and doesn't have to save to a file first # s3bucketUploader
else: tmpFile = it | cli.file(); self.client.upload_file(tmpFile, self.bucket.name, self.key); os.remove(tmpFile) # s3bucketUploader
return self.bucket[self.key] # s3bucketUploader
def __repr__(self): return f"<s3bucketUploader bucket.name='{self.bucket.name}' key='{self.key}'>" # s3bucketUploader
[docs]
class s3bucket: # s3bucket
[docs]
def __init__(self, client, name:str): # s3bucket
"""Represents an S3 bucket.
Example::
client = ...
db = s3(client)
bucket = db["bucket-name-here"]
See also: :class:`s3`""" # s3bucket
self.client = client; self.name = name; self._cachedLs = None # s3bucket
def _ls(self): # s3bucket
client = self.client; name = self.name; # s3bucket
self._cachedLs = {data["Key"]:s3obj(client, name, data) for data in self.client.list_objects(Bucket=name).get("Contents", [])} # s3bucket
return list(self._cachedLs.values()) # s3bucket
def __len__(self): # s3bucket
if self._cachedLs is None: self._ls() # s3bucket
return len(self._cachedLs) # s3bucket
[docs]
def upload(self, key:str=None): # s3bucket
"""Uploads some content to s3.
Example::
bucket = ...
obj = "abc\\ndef" | bucket.upload() # uploads some text
obj = b"abc\\ndef" | bucket.upload() # uploads some bytes
This works with whatever you can pipe into :class:`~k1lib.cli.output.file`. After uploading,
you will receive a :class:`s3obj` object.
:param key: if not specified, will auto generate a key""" # s3bucket
if key is None: key = _s3bucketUploader_autoInc() # s3bucket
return s3bucketUploader(self.client, self, key) # s3bucket
def __repr__(self): return f"<s3bucket name='{self.name}'>" # s3bucket
def __getitem__(self, key:str): # s3bucket
res = self.client.head_object(Bucket=self.name, Key=key) # s3bucket
data = {"Key": key, "LastModified": res["LastModified"], "Size": int(res["ResponseMetadata"]["HTTPHeaders"]["content-length"]), "StorageClass": None} # s3bucket
return s3obj(self.client, self.name, data) # s3bucket
def __delitem__(self, key:str): self.client.delete_object(Bucket=self.name, Key=key) # s3bucket
[docs]
class s3obj: # s3obj
[docs]
def __init__(self, client, bucket:str, data): # s3obj
"""Represents an S3 object. Not intended to be instantiated directly.
See also: :class:`s3`""" # s3obj
self.client = client; self.bucket = bucket # s3obj
self.key = data["Key"]; self.lastModified = data["LastModified"] | cli.toUnix() # s3obj
self.size = data["Size"]; self.storageClass = data["StorageClass"] # s3obj
def __repr__(self): return f"<s3obj bucket='{self.bucket}' key='{self.key}' size='{k1lib.fmt.size(self.size)}' lastModified='{self.lastModified | cli.toIso()}'>" # s3obj
def _cat(self, kwargs): # s3obj
sB = kwargs["sB"]; eB = kwargs["eB"] # s3obj
if eB < 0: eB = self.size # s3obj
res = self.client.get_object(Bucket=self.bucket, Key=self.key, Range=f'bytes={sB}-{eB-1}')["Body"].read() # s3obj
if kwargs["text"]: return res.decode().split("\n") # s3obj
if kwargs["chunks"]: return res | cli.batched(settings.cli.cat.chunkSize, True) # s3obj
return res # s3obj
redis = k1lib.dep("redis") # s3obj
_redisAutoInc = k1lib.AutoIncrement(prefix=f"_redis_msg_{round(time.time())}_") # s3obj
settings.cred.add("redis", k1lib.Settings() # s3obj
.add("host", "localhost", "location of the redis server", env="K1_REDIS_HOST") # s3obj
.add("port", 6379, "port the redis server use", env=("K1_REDIS_PORT", int)) # s3obj
.add("expires", 120, "seconds before the message deletes itself. Can be float('inf'), or 'inf' for the env variable", env=("K1_REDIS_EXPIRES", lambda x: float(x.lower()))), # s3obj
"anything related to redis, used by k1lib.cli.lsext.Redis") # s3obj
[docs]
class Redis: # Redis
[docs]
def __init__(self, host=None, port=None, **kwargs): # Redis
"""Connects to Redis server.
Example::
r = Redis("localhost", 6379)
r["abc"] = {"some": "json", "number": 4} # sets value
r["abc"] # reads value, returns python object
key = r("some message") # sets value with auto generated key, returns that key
r[key] # reads that value, returns "some message"
key = r("some message", 60) # sets value with auto generated key, with expires amount, returns that key
You can actually leave the constructors empty, as it will automatically pull the data from
"settings.cred.redis.host", which pulls from the env variable "K1_REDIS_HOST". So it can
be as short as this::
r = Redis()
r["abc"] = {"some": "json", "number": 4} # sets value
Internally, all objects are pickled and unpickled, so you can transport pretty much
any Python object. Also, by default, each message will expire after 60s, You can
adjust this via "settings.cred.redis.expires" setting.
This is really only meant as a quick way for different servers to communicate
with each other. If your use case is different, don't use this.
:param host: host name of the redis server
:param port: port the redis server uses
:param kwargs: extra kwargs sent to :class:`redis.Redis`""" # Redis
self.host = host or settings.cred.redis.host; self.port = port or settings.cred.redis.port # Redis
self.client = redis.Redis(host=self.host, port=self.port, **kwargs) # Redis
def __getitem__(self, i): res = self.client.get(i); return None if res is None else dill.loads(res) # Redis
def __setitem__(self, i, v, ex=None): ex = ex or settings.cred.redis.expires; self.client.set(i, dill.dumps(v), **({"ex":ex} if ex < float("inf") else {})) # Redis
def __call__(self, v, ex=None) -> str: i = _redisAutoInc(); self.__setitem__(i, v, ex); return i # Redis
def __repr__(self): return f"<Redis host='{self.host}:{self.port}'>" # Redis