# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import k1lib, json, base64, threading, time, random, struct, collections, os, sys, tempfile, contextlib, math, functools, inspect, random; import k1lib.cli as cli; k1 = k1lib
from collections import deque
__all__ = ["log", "aes_encrypt", "aes_decrypt", "aes_encrypt_json", "aes_decrypt_json", "tempObj", "TimeSeries", "speed", "compileCExt"]
_logObj = {"loaded": False, "logMsgs": deque(), "path": None}
def _thTarget(): # _thTarget
import asyncio, base64, json; from k1lib import kws # _thTarget
async def main(): # _thTarget
async with kws.WsClient("wss://ws.logs.mlexps.com/_k1_ingest") as ws: # _thTarget
while True: # _thTarget
if len(_logObj["logMsgs"]) == 0: await asyncio.sleep(0.01) # _thTarget
else: await ws.send(_logObj["logMsgs"].popleft()) # _thTarget
asyncio.new_event_loop().run_until_complete(main()) # _thTarget
def log(path:str, obj:"any"): # log
"""Logs random debug statements to logs.mlexps.com server.
Example::
k1.log("ggdrive/topic1", "some message")
k1.log("ggdrive/topic1/sub2", {"some": "json", "object": 2})
# I typically do it like this, so that I can filter down only the messages that I want based on severity
k1.log("ggdrive/info", {"some": "json", "object": 2})
k1.log("ggdrive/error", {"some": "json", "object": 2})
Visit the website https://logs.mlexps.com/watch/ggdrive, or
/watch/ggdrive/topic1, or /watch/ggdrive/topic1/sub2 to view all logs
coming in.""" # log
if not _logObj["loaded"]: _logObj["loaded"] = True; threading.Thread(target=_thTarget).start() # log
if not isinstance(obj, (str, float, int)): # log
obj = base64.b64encode(json.dumps(obj).encode()).decode() # log
_logObj["logMsgs"].append(f"{path}/{obj}") # log
if k1lib.settings.startup.import_optionals: # log
try: # log
from scipy import stats # log
__all__.append("pValue") # log
[docs]
def pValue(zScore): # log
"""2-sided p value of a particular z score. Requires :mod:`scipy`.""" # log
return stats.norm.sf(abs(zScore))*2 # log
except: pass # log
try: # log
Crypto = k1lib.dep("Crypto", "pycryptodome", url="https://pycryptodome.readthedocs.io/en/latest/") # log
def aes_encrypt(plaintext:bytes) -> str: Crypto.Cipher # log
def aes_decrypt(plaintext:bytes) -> str: Crypto.Cipher # log
def aes_encrypt_json(obj:dict) -> str: Crypto.Cipher # log
def aes_decrypt_json(ciphertext:str) -> dict: Crypto.Cipher # log
from Crypto.Cipher import AES # log
from Crypto.Random import get_random_bytes # log
from Crypto.Util.Padding import pad, unpad # log
[docs]
def aes_encrypt(plaintext:bytes, key:bytes=None) -> str: # log
"""Encrypts a message using AES.
Example::
res = k1.aes_encrypt(b"some message") # can return '3HV7PKKQL2DLWQWBBTETQTXNMC4Q6DJ2FSS73A7NCRAX6K4ZZKXQ===='
k1.aes_descrypt(res) # returns b"some message"
After encrypting, this is encoded using base32, ready to be used in urls. This function
is a convenience function meant for small messages here and there, and is not intended
for heavy duty encryption.
The key is automatically generated, and is configurable via ``settings.cred.aes.key``
See also: :meth:`aes_encrypt_json`
:param plaintext: plaintext to encrypt
:param key: 128 bit key, if not specified then will auto generate one on library load at ``settings.cred.aes.key`` """ # log
if not isinstance(plaintext, bytes): plaintext = f"{plaintext}".encode() # log
cipher = AES.new(key or k1lib.settings.cred.aes.key, AES.MODE_CBC); ciphertext = cipher.encrypt(pad(plaintext, AES.block_size)) # log
return base64.b32encode(cipher.iv + ciphertext).decode().replace(*"/_").replace(*"+-") # log
[docs]
def aes_decrypt(ciphertext:str, key:bytes=None) -> bytes: # log
"""Decrypts a message using AES.
See :meth:`aes_encrypt` for more information.
:param ciphertext: ciphertext to decrypt
:param key: 128 bit key, if not specified then will auto generate one on library load at ``settings.cred.aes.key``""" # log
ciphertext = base64.b32decode(ciphertext.replace(*"-+").replace(*"_/").encode()); iv = ciphertext[:AES.block_size]; cipher = AES.new(key or k1lib.settings.cred.aes.key, AES.MODE_CBC, iv) # log
return unpad(cipher.decrypt(ciphertext[AES.block_size:]), AES.block_size) # log
[docs]
def aes_encrypt_json(obj:dict) -> str: # log
"""Encrypts a Python object using AES.
Example::
a = k1.aes_encrypt_json({"a": 3})
k1.aes_decrypt_json(a) # returns {"a": 3}
k1.aes_decrypt_json(k1.aes_encrypt_json([1, 2, 3])) # returns [1, 2, 3]
k1.aes_decrypt_json(k1.aes_encrypt_json("abc")) # returns "abc"
See also: :meth:`aes_encrypt`""" # log
return aes_encrypt(json.dumps(obj).encode()) # log
[docs]
def aes_decrypt_json(ciphertext:str) -> dict: # log
return json.loads(aes_decrypt(ciphertext).decode()) # log
k1lib.settings.cred.add("aes", k1lib.Settings().add("key", get_random_bytes(16), "16-byte aes key, used in aes_encrypt() and aes_decrypt()", sensitive=True), "anything related to AES block cipher") # log
except: pass # log
k1lib.settings.add("tempObjLifetime", 60, "Default lifetime in seconds used in k1.tempObj()"); # log
_tempObjs = {}; _tempTimeouts = {}; _tempObj_autoInc = k1lib.AutoIncrement(prefix="_k1_tempObj_") # log
[docs]
def tempObj(x, timeout=None): # tempObj
"""Stores an object that's meant to exist for a short amount of time,
and then will be automatically deleted. Example::
key = k1.tempObj("Suika Ibuki", 10) # stores some string that will only last for 10 seconds
k1.tempObj(key) # returns "Suika Ibuki"
time.sleep(20)
k1.tempObj(key) # returns None
The default timeout value is 60 seconds, configurable in :data:`~k1lib.settings`.tempObjLifetime""" # tempObj
if isinstance(x, str) and x.startswith("_k1_tempObj_"): return _tempObjs.get(x, None) # tempObj
else: # tempObj
k = _tempObj_autoInc() # tempObj
if timeout is None: timeout = k1lib.settings.tempObjLifetime # tempObj
_tempObjs[k] = x; _tempTimeouts[k] = time.time() + timeout; return k # tempObj
def tempCleanupThread(): # tempCleanupThread
while True: # tempCleanupThread
now = time.time() # tempCleanupThread
for k,v in list(_tempTimeouts.items()): # tempCleanupThread
if now > v: del _tempObjs[k]; del _tempTimeouts[k] # tempCleanupThread
time.sleep(1) # tempCleanupThread
threading.Thread(target=tempCleanupThread, daemon=True).start() # tempCleanupThread
_time = time.time; _timeSeriesD = {}; _timeSeriesID = {}; _timeSeriesAutoInc = k1.AutoIncrement(prefix="_k1_ts_"); _timeSeriesIdxAutoInc = k1.AutoIncrement() # tempCleanupThread
[docs]
class TimeSeries: # TimeSeries
[docs]
def __init__(self, name:str=None, fn:str=None, storeRaw:bool=True, retention:int=7*86400, coldStore:bool=False): # TimeSeries
"""Manages time series data, compresses them, back them up on disk if necessary.
Example::
ts1 = k1.TimeSeries(name="ts1")
ts1.append(3, 4, 5) # do this anywhere you'd like. This saves 1 data point containing 3 floats to the sqlite database
for i in range(600): # deposits 600 samples over 1 minute time span
ts1.append(random.random(), random.random(), random.random())
time.sleep(0.1)
ts1.getRaw() # returns something like [[1737213223.4139452, (3, 4, 5)], ...]
ts1.getRate() # returns something like [(1737213313.0752494, 10.066128035852211), ...]
ts1.getPert() # returns something like [[1737213568.9260075, [(0.009, 0.07, 0.47, 0.89, 0.99), (0.001, 0.11, 0.56, 0.90, 0.99), (0.0006, 0.08, 0.46, 0.89, 0.99)]], ...]
For :meth:`getRate`, first number is the timestamp, second is the number of data points/second.
For :meth:`getPert`, this will return the percentiles of the input data (0%, 10%, 50%, 90%, 100%) for each variable
Why does this functionality exists? Well, it's because managing time series usually involves
a lot of pain. You need to setup a time series database like Prometheus, or Postgresql to be
extra simple. But setting up all that infrastructure takes a lot of effort, and again, if it's
hard, you won't do it, or will be incentivised not to do it. So this class is meant to be an
object that manages time series data. It manages it in such a way so that you can spam this
all over the place and get lots of functionalities right out of the box, without an external
server. All data is stored in several tables inside a sqlite file. Each time series gets its
own sqlite file. Some performance numbers to keep in mind:
- Data write speed: 100k data points/s
- Data read speed: 400k data points/s
- Disk space used: 50 bytes/data point for small amounts of variables (say ~3)
Other features include the ability to auto delete old data so as not to accumulate over time.
When old data is deleted, there's also an option to save the deleted data in a separate file
for cold storage, so that it's more efficient storage-wise than sqlite, but harder to access.
Cold storage space used: 14 + nVars * 4. This is 5x smaller than sqlite for 3 variables
There will be scans every 10 seconds on another thread, that compresses the raw data into a
usable form. If there's too few data points (<20 data points), then it will skip that scan
cycle. Data (refined and raw) will be saved into a sqlite database, stored at the specified
file name ``fn``. If no file name is specified, then this will create a temp file and turn
that into a sqlite database.
For every method, you can also specify an index number::
ts1 = k1.TimeSeries(name="ts1")
ts1.appendIdx(2, 3, 4, 5) # index 2 with 3 values (3, 4, 5)
ts1.getRaw(idx=2) # returns all data points with idx=2, something like [[1737213223.4139452, (3, 4, 5)], ...]
ts1.getPert(idx=2) # returns all percentiles with idx=2, something like [[1737213568.9260075, [(0.009, 0.07, 0.47, 0.89, 0.99), (0.001, 0.11, 0.56, 0.90, 0.99), (0.0006, 0.08, 0.46, 0.89, 0.99)]], ...]
This is useful in situations like when you have lots of sensors for different devices. Then
each idx can be the device id, and so you can store lots of variables in 1 go for 1 specific
device. Then you can query the time series later on for 1 specific device only.
You can get all TimeSeries via :meth:`allData`.
.. note::
Performance details
This is a more detailed section going over what actually happens underneath in case you
care about performance. Everything is stored in sqlite. If file name is not given, then
it still uses sqlite, but in memory instead.
When a new .append() happens, no database interaction happens at all. It's simply just
appended to a totally normal, internal list, so <1us. Then there are 2 background
threads to help collate and store the data. One is fast (10s scan) and one is slow
(60s scan). The fast one distributes new data points to 3 internal stacks, "rawRaw",
"rateRaw" and "pertD". If rawRaw has any elements, it will be stored in sqlite. If
rateRaw has at least 10 elements, it will calculate the rate and store in sqlite. If
pertD (indexed by idx) has at least 100 elements, it will calculate percentiles and
store in sqlite.
This architecture has several ramifications:
* Time taken to execute .append() is very fast
* If no .append() are called for a long time, no sqlite queries will be executed
* If too little .append() are called, data might not show up in rate and percentile views at all
* Might have to wait for at least 10 seconds before .getRaw() has the newest data
* If python process exited, there may still be data stored in memory that's not recorded on sqlite yet
For the second loop, it grabs all rows from sqlite that was longer than ``retention``
seconds ago, compresses them to an efficient binary format, then appends to the cold
storage file. My code is not as bulletproof as sqlite, but still works fine. Because
the loop is very slow, it shouldn't affect performance much.
:param name: just for cosmetic, to remind you what this does
:param fn: sqlite file name to store this time series data. If not specified, then stores database in memory
:param storeRaw: whether to store raw data points
:param retention: seconds before deleting old data point permanently, default 1 week
:param coldStore: if True, when data points past retention time, it will be
packed into a single binary file for cold storage""" # TimeSeries
if coldStore and fn is None: raise Exception("If using cold storage, has to specify file name!") # TODO: ts1 | aS(repr), ts1 | toHtml() # TimeSeries
self._initialized = False; self.name = name or _timeSeriesAutoInc(); self.idx = _timeSeriesIdxAutoInc(); self.fn = fn; self.storeRaw = storeRaw; self.retention = retention; self.coldStore = coldStore; self.dbLock = threading.Lock() # TimeSeries
if "/" in self.name: raise Exception("Can't have forward slash in the name") # TimeSeries
if self.name in _timeSeriesD: raise Exception(f"Name '{self.name}' has appeared before. Please use a different name") # TimeSeries
self._raw = []; self._rawRaw = []; self._rateRaw = []; _timeSeriesD[self.name] = _timeSeriesID[self.idx] = self; self._setupDb(); self._pertD = collections.defaultdict(lambda: []); self._initialized = True # maps idx -> [time, values] # TimeSeries
def _setupDb(self): # TimeSeries
fn = self.fn # TimeSeries
if fn is not None and os.path.exists(f"{fn}.db"): self._s = s = cli.sql(f"{fn}.db", mode="lite")["default"]; self._dbRaw = s["raw"]; self._dbRate = s["rate"]; self._dbPert = s["pert"] # TimeSeries
else: # TimeSeries
self._s = s = cli.sql(":memory:" if fn is None else f"{fn}.db", mode="lite")["default"]; s.query("CREATE TABLE rate (id INTEGER PRIMARY KEY AUTOINCREMENT, time INTEGER, rate REAL);"); s.query("CREATE INDEX rate_time ON rate (time);"); # TimeSeries
s.query("CREATE TABLE raw (id INTEGER PRIMARY KEY AUTOINCREMENT, time INTEGER, idx INTEGER, data BLOB);"); s.query("CREATE INDEX raw_time ON raw (time);"); s.query("CREATE INDEX raw_idx ON raw (idx);"); # .data is struct.pack("ffff", values) # TimeSeries
s.query("CREATE TABLE pert (id INTEGER PRIMARY KEY AUTOINCREMENT, time INTEGER, idx INTEGER, data BLOB);"); s.query("CREATE INDEX pert_time ON pert (time);"); s.query("CREATE INDEX pert_idx ON pert (idx);"); # .data is struct.pack of [*n*[0, 10, 50, 90, 100]] # TimeSeries
while s["pert"] is None: print("."); time.sleep(0.1) # TimeSeries
self._dbRaw = s["raw"]; self._dbPert = s["pert"]; self._dbRate = s["rate"] # TimeSeries
[docs]
@staticmethod # TimeSeries
def allData(): return _timeSeriesD # TimeSeries
[docs]
@staticmethod # TimeSeries
def allIData(): return _timeSeriesID # TimeSeries
[docs]
@k1.cache(timeout=10, name="ts_idxs", docs="caches k1.TimeSeries idxs, aka all available idx") # TimeSeries
def idxs(self) -> "list[int]": # TimeSeries
"""Grabs all available idxs""" # TimeSeries
if self.storeRaw: return [x[0] for x in self._dbRaw.query("select distinct idx from raw")] # TimeSeries
return [x[0] for x in self._dbPert.query("select distinct idx from pert")] # TimeSeries
[docs]
def append(self, *values): sig = "f"*len(values); self._raw.append([_time(), 0, values, struct.pack(sig, *values)]); return values # TimeSeries
[docs]
def appendIdx(self, idx, *values): sig = "f"*len(values); self._raw.append([_time(), idx, values, struct.pack(sig, *values)]); return values # TimeSeries
[docs]
def getRaw(self, startTime:int=None, stopTime:int=None, idx:int=0, limit:int=1000000): # TimeSeries
"""Grabs raw data of this time series. Returns something like ``[[1737213223.4139452, (3, 4, 5)], ...]`` """ # TimeSeries
if not self.storeRaw: raise Exception(".storeRaw is False, so all raw data has been deleted") # TimeSeries
s = f"select time, data from raw where idx = {idx}" # TimeSeries
if startTime: s += f" and time >= {startTime}" # TimeSeries
if stopTime: s += f" and time < {stopTime}" # TimeSeries
s += f" order by time limit {limit}"; data = self._dbRaw.query(s) # TimeSeries
if len(data) == 0: return [] # TimeSeries
s = (len(data[0][1])//4)*"f"; res = [] # TimeSeries
try: # fast way, caches "s" computation # TimeSeries
for t, vs in data: res.append([t, struct.unpack(s, vs)]) # TimeSeries
except: # slow way, in case number of variables are different # TimeSeries
for t, vs in data: res.append([t, struct.unpack((len(vs)//4)*"f", vs)]) # TimeSeries
return res # TimeSeries
[docs]
def getRate(self, startTime:int=None, stopTime:int=None, limit:int=10000): # TimeSeries
"""Grabs data ingest rate of this time series. Returns something like ``[(1737213313.0752494, 10.066128035852211), ...]``""" # TimeSeries
s = f"select time, rate from rate where true" # TimeSeries
if startTime: s += f" and time >= {startTime}" # TimeSeries
if stopTime: s += f" and time < {stopTime}" # TimeSeries
s += f" order by time limit {limit}"; data = self._dbRate.query(s) # TimeSeries
return data # TimeSeries
[docs]
def getPert(self, startTime:int=None, stopTime:int=None, idx:int=0, limit:int=10000): # TimeSeries
"""Grabs data percentiles of this time series. Returns something like ``[[1737213568.9260075, [(0.009, 0.07, 0.47, 0.89, 0.99), (0.001, 0.11, 0.56, 0.90, 0.99), (0.0006, 0.08, 0.46, 0.89, 0.99)]], ...]``""" # TimeSeries
def _batched(x): return [x[5*i:5*(i+1)] for i in range(len(x)//5)] # TimeSeries
s = f"select time, data from pert where idx = {idx}" # TimeSeries
if startTime: s += f" and time >= {startTime}" # TimeSeries
if stopTime: s += f" and time < {stopTime}" # TimeSeries
s += f" order by time limit {limit}"; data = self._dbPert.query(s) # TimeSeries
if len(data) == 0: return [] # TimeSeries
nvars = len(data[0][1])//4; s = nvars*"f"; res = [] # TimeSeries
try: # fast way, caches "s" computation # TimeSeries
for t, vs in data: x = struct.unpack(s, vs); res.append([t, [x[5*i:5*(i+1)] for i in range(nvars//5)]]) # TimeSeries
except: # slow way, in case number of variables are different between lines # TimeSeries
for t, vs in data: x = struct.unpack((len(vs)//4)*"f", vs); res.append([t, [x[5*i:5*(i+1)] for i in range(len(x)//5)]]) # TimeSeries
return res # TimeSeries
def _ls(self): return self.getRaw(limit=100) # TimeSeries
@k1.cache(timeout=60, maxsize=1000, name="ts_len", docs="caches k1.TimeSeries lengths") # TimeSeries
def __len__(self): # TimeSeries
if not self.storeRaw: raise Exception(f"TimeSeries '{self.name}'.storeRaw is False, no length available") # TimeSeries
res = self._dbRaw.query("""select max(id) from raw limit 1;"""); return res[0][0] if len(res) > 0 else 0 # TimeSeries
def __repr__(self): return f"<TimeSeries name='{self.name}' fn='{self.fn}' storeRaw={self.storeRaw} retention={self.retention} coldStore={self.coldStore}>" # TimeSeries
[docs]
def plotRaw(self, startTime, endTime, idx=0, dIdx="all", window=1): # TimeSeries
import matplotlib.pyplot as plt; s = self; data = s.getRaw(startTime, endTime, idx=idx) | ~cli.apply(lambda x,y: [[x, e] for e in y]) | cli.T() | cli.apply(cli.T() | (cli.apply(cli.window(10) | cli.toMean().all()) if window > 1 else cli.iden())) | cli.deref() # TimeSeries
with k1.mplLock: # TimeSeries
if len(data) == 0: return "No data available" # TimeSeries
if dIdx == "all": data | cli.apply(~cli.aS(plt.dplot, 7, True, ".-")) | cli.ignore() # TimeSeries
else: data | cli.rItem(dIdx) | ~cli.aS(plt.dplot, 7, True, ".-") # TimeSeries
plt.title(f"Raw '{self.name}', idx {idx}, dIdx {dIdx}, window {window}"); plt.tight_layout(); im = plt.gcf() | cli.toImg() # TimeSeries
return im | cli.toHtml() # TimeSeries
[docs]
@staticmethod # TimeSeries
def splotRaw(name, startTime, endTime, idx=0, dIdx="all", window=1): # TimeSeries
d = k1.TimeSeries.allData(); s = d.get(name, None) # TimeSeries
if s is None: return f"No TimeSeries with the name '{name}' found" # TimeSeries
return s.plotRaw(startTime, endTime, idx=0, dIdx="all", window=1) # TimeSeries
[docs]
def plotRate(self, startTime, endTime, window=1): # TimeSeries
import matplotlib.pyplot as plt; s = self; data = s.getRate() | (cli.T.wrap(cli.apply(cli.window(window) | cli.toMean().all())) if window > 1 else cli.iden()) | cli.T() | cli.deref() # TimeSeries
with k1.mplLock: # TimeSeries
if len(data) == 0: return "No data available" # TimeSeries
data | ~cli.aS(plt.dplot, 7, True, ".-"); plt.xlabel("Time"); plt.ylabel("Rate (calls/s)"); plt.title(f"Rate '{self.name}', window {window}"); plt.tight_layout(); im = plt.gcf() | cli.toImg() # TimeSeries
return im | cli.toHtml() # TimeSeries
[docs]
@staticmethod # TimeSeries
def splotRate(name, startTime, endTime, window=1): # TimeSeries
d = k1.TimeSeries.allData(); s = d.get(name, None) # TimeSeries
if s is None: return f"No TimeSeries with the name '{name}' found" # TimeSeries
return s.plotRate(startTime, endTime, window=1) # TimeSeries
[docs]
def plotPert(self, startTime, stopTime, idx=0, dIdx=0, window=1): # TimeSeries
import matplotlib.pyplot as plt; s = self; data = s.getPert(startTime, stopTime, idx) | cli.apply(lambda x: x[dIdx], 1) | ~cli.apply(lambda x,y: [[x, e] for e in y]) | cli.T() | cli.apply(cli.T() | (cli.apply(cli.window(window) | cli.toMean().all()) if window > 1 else cli.iden())) | cli.deref() # TimeSeries
with k1.mplLock: # TimeSeries
if len(data) == 0: return "No data available" # TimeSeries
data | cli.apply(~cli.aS(plt.dplot, 7, True, ".-")) | cli.ignore(); plt.legend(["0%", "10%", "50%", "90%", "100%"]); plt.xlabel("Time"); plt.ylabel("Value"); plt.title(f"Percentile '{self.name}', idx {idx}, dIdx {dIdx}, window {window}"); plt.tight_layout(); im = plt.gcf() | cli.toImg() # TimeSeries
return im | cli.toHtml() # TimeSeries
[docs]
@staticmethod # TimeSeries
def splotPert(name, startTime, stopTime, idx=0, dIdx=0, window=1): # TimeSeries
d = k1.TimeSeries.allData(); s = d.get(name, None) # TimeSeries
if s is None: return f"No TimeSeries with the name '{name}' found" # TimeSeries
return s.plotPert(startTime, stopTime, idx=0, dIdx=0, window=1) # TimeSeries
[docs]
@staticmethod # TimeSeries
def flask(app, **kwargs): # TimeSeries
"""Attaches a TimeSeries management plane to a flask app.
Example::
app = flask.Flask(__name__)
k1.TimeSeries.flask(app)
app.run(host="0.0.0.0", port=80)
Then, you can access the route "/k1/ts" to see an overview of all TimeSeries
:param app: flask app object
:param kwargs: extra random kwargs that you want to add to ``app.route()`` function""" # TimeSeries
bootstrapJs = """
async function dynamicLoad(selector, endpoint, rawHtml=null) { // loads a remote endpoint containing html and put it to the selected element. If .rawHtml is available, then don't send any request, and just use that html directly
const elem = document.querySelector(selector); elem.innerHTML = rawHtml ? rawHtml : (await (await fetch(endpoint)).text());
await new Promise(r => setTimeout(r, 100)); let currentScript = "";
try { for (const script of elem.getElementsByTagName("script")) { currentScript = script.innerHTML; eval(script.innerHTML); }
} catch (e) { console.log(`Error encountered: `, e, e.stack, currentScript); }
}"""; bootstrapHtml = f"""
<head>
<meta charset="UTF-8"><title>DHCP low level server</title><meta name="viewport" content="width=device-width, initial-scale=1.0">
<link href="https://static.aigu.vn/daisyui.css" rel="stylesheet" type="text/css" />
<style>
h1 {{ font-size: 2.25rem !important; line-height: 2.5rem !important; }}
h2 {{ font-size: 1.5rem !important; line-height: 2rem !important; margin: 10px 0px !important; }}
h3 {{ font-size: 1.125rem !important; line-height: 1.75rem !important; margin: 6px 0px !important; }}
textarea {{ border: 1px solid; padding: 8px 12px !important; border-radius: 10px !important; }}
body {{ padding: 12px; }}
</style><script>{bootstrapJs}</script>
</head>"""; tss = k1.TimeSeries.allData() # TimeSeries
# time series stuff # TimeSeries
@app.route("/k1/ts", **kwargs) # TimeSeries
def ts_index(): # TimeSeries
pre = cli.init._jsDAuto(); ui1 = tss.items() | ~cli.apply(lambda k,v: [k, v.fn, v.storeRaw, v.retention, v.coldStore, v | (cli.tryout() | cli.aS(len))]) | cli.deref() | (cli.toJsFunc("term") | cli.grep("${term}") | k1.viz.Table(["name", "fn", "storeRaw", "retention", "coldStore", "#hits"], height=600, onclickFName=f"{pre}_select", selectable=True, sortF=True)) | cli.op().interface() | cli.toHtml() # TimeSeries
return f"""{bootstrapHtml}<h1>TimeSeries</h1><div style="overflow-x: auto; margin-top: 12px">{ui1}</div><div id="{pre}_details"></div>
<script>\nfunction {pre}_select(row, i, e) {{ dynamicLoad("#{pre}_details", `/k1/ts/${{row[0]}}/fragment`); }}</script>""" # TimeSeries
@app.route("/k1/ts/<name>/fragment", **kwargs) # TimeSeries
def ts_name(name): # TimeSeries
s = tss.get(name, None) # TimeSeries
if s is None: return f"No TimeSeries '{name}' found" # TimeSeries
idxs = s.idxs(); pre = cli.init._jsDAuto(); ui1 = idxs | (cli.toJsFunc("idx") | cli.grep("${idx}") | cli.batched(10, True) | k1.viz.Table(sortF=True)) | cli.op().interface() | cli.toHtml(); return f"""
<h2>TimeSeries '{s.name}'</h2><div style="border: 1px solid black; padding: 8px">{ui1}</div>
<div style="display: grid; grid-template-columns: auto auto; row-gap: 8px; column-gap: 8px; align-items: center; margin-top: 12px; width: fit-content">
<div>idx: </div><input id="{pre}_idx" class="input input-bordered" value="0" />
<div>dIdx: </div><input id="{pre}_dIdx" class="input input-bordered" value="0" />
<div>timeStr: </div><input id="{pre}_timeStr" class="input input-bordered" value="1 day" />
<div>window: </div><input id="{pre}_window" class="input input-bordered" value="1" />
</div><button id="{pre}_graphBtn" class="btn" style="margin-top: 8px">Graph</button><div id="{pre}_detailsRaw"></div><div id="{pre}_detailsRate"></div><div id="{pre}_detailsPert"></div>
<script>
(async () => {{
let dS = (x) => document.querySelector(x); dS("#{pre}_graphBtn").onclick = async () => {{
let idx = dS("#{pre}_idx").value; let dIdx = dS("#{pre}_dIdx").value; let window = dS("#{pre}_window").value; let timeStr = dS("#{pre}_timeStr").value;
dynamicLoad("#{pre}_detailsRaw", `/k1/ts/{name}/fragment/${{idx}}/raw/${{dIdx}}/${{window}}/${{timeStr}}`);
dynamicLoad("#{pre}_detailsRate", `/k1/ts/{name}/fragment/0/rate/0/${{window}}/${{timeStr}}`);
if (dIdx !== "all") dynamicLoad("#{pre}_detailsPert", `/k1/ts/{name}/fragment/${{idx}}/pert/${{dIdx}}/${{window}}/${{timeStr}}`);
}}
}})();
</script>""" # TimeSeries
@app.route("/k1/ts/<name>/fragment/<int:idx>/raw/<dIdx>/<int:window>/<timeStr>", **kwargs) # TimeSeries
def ts_raw(name, idx, dIdx, window, timeStr): return k1.TimeSeries.splotRaw(name, *k1.parseTimeStr(timeStr), idx, "all" if dIdx == "all" else int(dIdx), window) # TimeSeries
@app.route("/k1/ts/<name>/fragment/0/rate/0/<int:window>/<timeStr>", **kwargs) # TimeSeries
def ts_rate(name, window, timeStr): return k1.TimeSeries.splotRate(name, *k1.parseTimeStr(timeStr), window) # TimeSeries
@app.route("/k1/ts/<name>/fragment/<int:idx>/pert/<int:dIdx>/<int:window>/<timeStr>", **kwargs) # TimeSeries
def ts_pert(name, idx, dIdx, window, timeStr): return k1.TimeSeries.splotPert(name, *k1.parseTimeStr(timeStr), idx, "all" if dIdx == "all" else int(dIdx), window) # TimeSeries
[docs]
def dummyLoad(self, niter=600, frac=0.1): # TimeSeries
"""Simulates a dummy load on this time series. Read source of
this method to understand what it does.
:param niter: number of iterations
:param frac: fraction of new random numbers and extra time to wait. 0 for more deterministic, 1 for more chaotic""" # TimeSeries
a = random.random(); b = random.random(); c = random.random() # TimeSeries
for i in range(niter) | cli.tee().crt(): # TimeSeries
a += (random.random()-0.5)*frac; b += (random.random()-0.5)*frac; c += (random.random()-0.5)*frac # TimeSeries
self.append(a, b, c); print(self.getRaw() | cli.shape(0), end=""); time.sleep(0.1 + random.random()*frac) # TimeSeries
@k1.cron(delay=11, daemon=True, delayedStart=5, name="ts_main", docs="k1.TimeSeries fast scan thread") # TimeSeries
def _timeSeriesThread(): # _timeSeriesThread
for idx, ts in _timeSeriesD.items(): # _timeSeriesThread
if not ts._initialized: continue # _timeSeriesThread
now = time.time() # raw format: [time, idx, values, pack values] # _timeSeriesThread
if len(ts._raw) > 0: # transfer new raw data to other buffers to be processed later # _timeSeriesThread
_raw = ts._raw; ts._raw = []; ts._rateRaw.extend(_raw); ts._rawRaw.extend(_raw) # _timeSeriesThread
for t,idx,vs,hvs in _raw: ts._pertD[idx].append([t, vs]) # _timeSeriesThread
n = len(ts._rateRaw) # _timeSeriesThread
if n > 10: # at least 10 data points before collating, or 60 seconds passed and no more data # _timeSeriesThread
_rateRaw = ts._rateRaw; ts._rateRaw = [] # _timeSeriesThread
_max = max(x[0] for x in _rateRaw); _min = min(x[0] for x in _rateRaw); deltaT = _max - _min # _timeSeriesThread
ts._dbRate.insert(time=_min, rate=len(_rateRaw)/deltaT) # _timeSeriesThread
for idx, tvs in list(ts._pertD.items()): # _timeSeriesThread
if len(tvs) > 100: # _timeSeriesThread
ts._pertD[idx] = []; n = len(tvs); data = tvs | cli.cut(1) | cli.T() | cli.sort(None).all() | cli.apply(lambda vs: [vs[0], vs[n//10], vs[n//2], vs[9*n//10], vs[-1]]) | cli.joinSt() | cli.aS(list) # _timeSeriesThread
ts._dbPert.insert(time=tvs[0][0], idx=idx, data=struct.pack("f"*len(data), *data)) # _timeSeriesThread
if ts.storeRaw and len(ts._rawRaw) > 0: # _timeSeriesThread
rr = ts._rawRaw; ts._rawRaw = [] # _timeSeriesThread
ts._dbRaw.query(f"""INSERT INTO raw ( time, idx, data ) VALUES """ + ", ".join(f"({t}, {idx}, ?)" for t,idx,vs,hvs in rr), *[hvs for t,idx,vs,hvs in rr]) # _timeSeriesThread
@k1.cron(delay=61, daemon=True, delayedStart=11, name="ts_retention", docs="k1.TimeSeries slow scan thread for retention control") # _timeSeriesThread
def _timeSeriesRetentionThread(): # scans to see whether there's old outdated data in sqlite, and delete them and store in cold storage # _timeSeriesRetentionThread
for idx, ts in _timeSeriesD.items(): # _timeSeriesRetentionThread
if not ts._initialized: continue # _timeSeriesRetentionThread
now = time.time(); data = ts._dbRaw.query("select time from raw order by time limit 1") # _timeSeriesRetentionThread
if len(data) == 0: continue # _timeSeriesRetentionThread
beginTime = data[0][0] # _timeSeriesRetentionThread
if now - beginTime > ts.retention: # _timeSeriesRetentionThread
t = now - ts.retention # timestamp to slice off # _timeSeriesRetentionThread
if ts.coldStore: data = ts._dbRaw.query(f"select time, idx, data from raw where time < {t} order by time") # _timeSeriesRetentionThread
ts._dbRaw .query(f"delete from raw where time < {t}"); ts._dbPert.query(f"delete from pert where time < {t}"); ts._dbRate.query(f"delete from rate where time < {t}") # _timeSeriesRetentionThread
if ts.coldStore: data | ~cli.apply(lambda t,idx,data: struct.pack("di", t, idx) + data) | cli.apply(lambda x: b"\x00" + struct.pack("B", len(x)+2) + x) >> cli.file(f"{ts.fn}.cold") # _timeSeriesRetentionThread
_speedAutoInc = k1.AutoIncrement(prefix="_k1_speed_"); _speedData = {} # Dict[idx -> {name, mod, fn, raw, refined}] # _timeSeriesRetentionThread
[docs]
class speed(cli.BaseCli): # speed
[docs]
def __init__(self, name=None, fn=None, docs=None, coldStore=False): # speed
"""Tracks and benchmarks certain functions, and monitor them through time
with reports in order to deploy them absolutely everywhere. Example::
@k1.speed(name="some func description")
def f(x):
return x*3
You can get a list of all speed k1.TimeSeries objects via ``k1.TimeSeries.allData``
:param name: optional name to show up in :meth:`allData`
:param fn: file name. If specified, will store speed data in sqlite database
at this path, else store in memory
:param docs: optional docs to show up in :meth:`allData`
:param coldStore: if True, stores old speed data in condensed binary file. See more at :class:`TimeSeries`""" # speed
self.name = name or _speedAutoInc(); self.fn = fn; self.docs = docs; self.coldStore = coldStore # speed
if "/" in self.name: raise Exception("Can't have forward slash in the name") # speed
def __call__(self, f): # speed
if self.name in _speedData: raise Exception(f"Name '{self.name}' has appeared before. Please use a different name") # speed
_speedData[self.name] = self.obj = {"name": self.name, "docs": self.docs, "func": f, "ts": k1.TimeSeries(name=f"speed: {self.name}", fn=self.fn, coldStore=self.coldStore)} # speed
ts = self.obj["ts"]; _time = time.time # speed
def wrapper(*args, **kwargs): beginTime = _time(); res = f(*args, **kwargs); duration = _time() - beginTime; ts.append(duration); return res # speed
functools.update_wrapper(wrapper, f); return wrapper # speed
[docs]
@staticmethod # speed
def allData(): return _speedData # speed
[docs]
@staticmethod # speed
def flask(app, **kwargs): # speed
"""Attaches a speed management plane to a flask app.
Example::
app = flask.Flask(__name__)
k1.speed.flask(app)
app.run(host="0.0.0.0", port=80)
Then, you can access the route "/k1/speed" to see an overview of all speed
benchmarks. However, doing ``k1.TimeSeries.flask(app)`` and access at "/k1/ts"
would be more beneficial, as that contains all the graphs and data
:param app: flask app object
:param kwargs: extra random kwargs that you want to add to ``app.route()`` function""" # speed
@app.route("/k1/speed", **kwargs) # speed
def index(): # speed
d = k1.speed.allData(); ui1 = d.items() | ~cli.apply(lambda k,v: [k, v["func"].__name__, inspect.getfile(v["func"]), v["ts"].name, v["docs"]]) | cli.deref() | (cli.toJsFunc("term") | cli.grep("${term}") | k1.viz.Table(["name", "func's name", "func's file name", "TimeSeries name", "docs"], height=600, sortF=True)) | cli.op().interface() | cli.toHtml() # speed
return f"""<h1>Speed</h1><div style="overflow-x: auto">{ui1}</div>""" # speed
@contextlib.contextmanager # speed
def idenContext(): yield True # idenContext
_cextMods = {}; k1.settings.add("cExt", k1.Settings().add("includes", ["fstream", "iostream", "sstream", "mutex", "string", "vector", "cmath", "random"], "header files to include"), "k1.compileCExt()-related settings"); # idenContext
[docs]
def compileCExt(cppCode, moduleName, verbose=False): # compileCExt
"""Conveniently compiles a python C extension module and returns it.
Example::
mod = k1.compileCExt(\"\"\"
// pure math func, simple data types
double func1(double x) { for (int i = 0; i < 1000000; i++) x = std::cos(x); return x; }
// takes in array
double func2(std::vector<double>& arr) { double sum = 0; for (auto v : arr) sum += v; return sum; }
// returns array
std::vector<int> func3(int x, int n) { std::vector<int> ans; for (int i = 0; i < n; i++) ans.push_back(x+i); return ans; }
// nested arrays
std::vector<std::vector<int>> func4(int x, int n) {
std::vector<std::vector<int>> ans; std::vector<int> ans1, ans2;
for (int i = 0; i < n; i++) ans1.push_back(x+i);
for (int i = 0; i < n; i++) ans2.push_back(x+i*2);
ans.push_back(ans1); ans.push_back(ans2); return ans;
}
// complex string manipulation, splitting things like "A,3\\nB,4", std::vector<std::pair<std::string, int>>
std::vector<std::pair<std::string, int>> func5(std::string text) {
std::vector<std::pair<std::string, int>> ans; std::string line;
std::istringstream f(text); std::pair<std::string, int> pair;
while (std::getline(f, line)) {
int pos = line.find(","); pair.first = line.substr(0, pos);
pair.second = std::stoi(line.substr(pos+1)); ans.push_back(pair);
} return ans;
}
PYBIND11_MODULE(genM1, m) {
m.def("func1", &func1); m.def("func2", &func2); m.def("func3", &func3);
m.def("func4", &func4); m.def("func5", &func5);
}\"\"\", "genM1", verbose=True) # this takes around 15s to run. Yes it's slow, but it works
# python-equivalent functions
def func1(x):
for i in range(1000000): x = math.cos(x)
return x
def func2(arr): return sum(arr)
def func3(x, n): return [x+i for i in range(n)]
def func4(x, n): return [[x+i for i in range(n)], [x+i*2 for i in range(n)]]
def func5(s): return [(x, int(y)) for x,y in [x.split(",") for x in s.split("\\n")]]
mod.func1(3) # 22.8 ms ± 1.83 ms, 7.6x faster
func1(3) # 174 ms ± 24.1 ms
x = list(range(100))
mod.func2(x) # 7.25 μs ± 761 ns, 3.1x slower
func2(x) # 2.33 μs ± 299 ns
mod.func3(3, 10) # 1.16 μs ± 97 ns, 1.2x slower
func3(3, 10) # 946 ns ± 128 ns
mod.func4(3, 10) # 2.23 μs ± 188 ns, 1.25x faster
func4(3, 10) # 2.78 μs ± 292 ns
s = "A,3\\nB,4\\nC,5\\nD,6\\nE,7\\nF,8\\nG,9"
mod.func5(s) # 4.5 μs ± 286 ns, 1.07x faster
func5(s) # 4.81 μs ± 866 ns
Behind the scenes, this function generates a C source file, compiles it into a
python C extension module, then loads it in the current interpreter session. So
purpose of this is to very quickly drop down to C whenever the need arises.
Solutions like Cython is neat and all, but it's quite awkward to code in, and
doesn't have the full power of C++. Meanwhile, doing it like this gives you full
C++ features, as well as an easy python binding interface via pybind11.
Several header files are included by default, so you don't have to include them, like
<string>, <fstream>, etc. A list of them are in ``settings.cExt.includes``. You can
get a dict of all compiled modules via ``k1.compileCExt.mods()``
Also, as you can see from the tiny benchmark results, it's not always faster to use
the C version, if input and output translation operations takes longer than the
function itself. So although there's a lot of potential for speedups, you have to
be really careful about this, or else you risk slowing it down and wasting a bunch
of time.
:param cppCode: C++ source code. Common headers are included
:param moduleName: name of the module""" # compileCExt
# code mostly written by ChatGPT 4o. Verified to work tho # compileCExt
import pybind11; from setuptools import setup, Extension; from setuptools.command.build_ext import build_ext; import importlib.util; temp_dir = tempfile.mkdtemp() # compileCExt
print(f"temp_dir: {temp_dir}\n" if verbose else "", end=""); incls = k1.settings.cExt.includes | cli.apply(lambda x: f"#include <{x}>") | cli.join("\n") # compileCExt
cpp_file = f"""#include <pybind11/pybind11.h>\n#include <pybind11/stl.h>\nnamespace py = pybind11;\n{incls}\n""" + cppCode | cli.file(os.path.join(temp_dir, f"{moduleName}.cpp")) # compileCExt
ext_modules = [Extension(moduleName, sources=[cpp_file], include_dirs=[pybind11.get_include()], language="c++", extra_compile_args=["-O3", "-std=c++17"])] # compileCExt
class BuildExt(build_ext): # compileCExt
def run(self): build_ext.run(self) # compileCExt
def build_extension(self, ext): ext_path = self.get_ext_fullpath(ext.name); os.makedirs(os.path.dirname(ext_path), exist_ok=True); build_ext.build_extension(self, ext) # compileCExt
with (idenContext() if verbose else k1.captureStdout()): # compileCExt
setup(name=moduleName, ext_modules=ext_modules, cmdclass={"build_ext": BuildExt}, script_args=["build_ext", "--inplace"], options={"build_ext": {"build_lib": temp_dir}}); so_file = temp_dir | cli.ls() | cli.grep("cpython") | cli.item() # compileCExt
spec = importlib.util.spec_from_file_location(moduleName, so_file); module = importlib.util.module_from_spec(spec); spec.loader.exec_module(module); _cextMods[moduleName] = module; return module # compileCExt
compileCExt.mods = _cextMods # compileCExt