Source code for k1lib._advanced

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import k1lib, json, base64, threading, time, random, struct, collections, os, sys, tempfile, contextlib, math, functools, inspect, random; import k1lib.cli as cli; k1 = k1lib
from collections import deque
__all__ = ["log", "aes_encrypt", "aes_decrypt", "aes_encrypt_json", "aes_decrypt_json", "tempObj", "TimeSeries", "speed", "compileCExt"]
_logObj = {"loaded": False, "logMsgs": deque(), "path": None}
def _thTarget():                                                                 # _thTarget
    import asyncio, base64, json; from k1lib import kws                          # _thTarget
    async def main():                                                            # _thTarget
        async with kws.WsClient("wss://ws.logs.mlexps.com/_k1_ingest") as ws:    # _thTarget
            while True:                                                          # _thTarget
                if len(_logObj["logMsgs"]) == 0: await asyncio.sleep(0.01)       # _thTarget
                else: await ws.send(_logObj["logMsgs"].popleft())                # _thTarget
    asyncio.new_event_loop().run_until_complete(main())                          # _thTarget
def log(path:str, obj:"any"):                                                    # log
    """Logs random debug statements to logs.mlexps.com server.
Example::

    k1.log("ggdrive/topic1", "some message")
    k1.log("ggdrive/topic1/sub2", {"some": "json", "object": 2})

    # I typically do it like this, so that I can filter down only the messages that I want based on severity
    k1.log("ggdrive/info", {"some": "json", "object": 2})
    k1.log("ggdrive/error", {"some": "json", "object": 2})

Visit the website https://logs.mlexps.com/watch/ggdrive, or
/watch/ggdrive/topic1, or /watch/ggdrive/topic1/sub2 to view all logs
coming in."""                                                                    # log
    if not _logObj["loaded"]: _logObj["loaded"] = True; threading.Thread(target=_thTarget).start() # log
    if not isinstance(obj, (str, float, int)):                                   # log
        obj = base64.b64encode(json.dumps(obj).encode()).decode()                # log
    _logObj["logMsgs"].append(f"{path}/{obj}")                                   # log
if k1lib.settings.startup.import_optionals:                                      # log
    try:                                                                         # log
        from scipy import stats                                                  # log
        __all__.append("pValue")                                                 # log
[docs] def pValue(zScore): # log """2-sided p value of a particular z score. Requires :mod:`scipy`.""" # log return stats.norm.sf(abs(zScore))*2 # log
except: pass # log try: # log Crypto = k1lib.dep("Crypto", "pycryptodome", url="https://pycryptodome.readthedocs.io/en/latest/") # log def aes_encrypt(plaintext:bytes) -> str: Crypto.Cipher # log def aes_decrypt(plaintext:bytes) -> str: Crypto.Cipher # log def aes_encrypt_json(obj:dict) -> str: Crypto.Cipher # log def aes_decrypt_json(ciphertext:str) -> dict: Crypto.Cipher # log from Crypto.Cipher import AES # log from Crypto.Random import get_random_bytes # log from Crypto.Util.Padding import pad, unpad # log
[docs] def aes_encrypt(plaintext:bytes, key:bytes=None) -> str: # log """Encrypts a message using AES. Example:: res = k1.aes_encrypt(b"some message") # can return '3HV7PKKQL2DLWQWBBTETQTXNMC4Q6DJ2FSS73A7NCRAX6K4ZZKXQ====' k1.aes_descrypt(res) # returns b"some message" After encrypting, this is encoded using base32, ready to be used in urls. This function is a convenience function meant for small messages here and there, and is not intended for heavy duty encryption. The key is automatically generated, and is configurable via ``settings.cred.aes.key`` See also: :meth:`aes_encrypt_json` :param plaintext: plaintext to encrypt :param key: 128 bit key, if not specified then will auto generate one on library load at ``settings.cred.aes.key`` """ # log if not isinstance(plaintext, bytes): plaintext = f"{plaintext}".encode() # log cipher = AES.new(key or k1lib.settings.cred.aes.key, AES.MODE_CBC); ciphertext = cipher.encrypt(pad(plaintext, AES.block_size)) # log return base64.b32encode(cipher.iv + ciphertext).decode().replace(*"/_").replace(*"+-") # log
[docs] def aes_decrypt(ciphertext:str, key:bytes=None) -> bytes: # log """Decrypts a message using AES. See :meth:`aes_encrypt` for more information. :param ciphertext: ciphertext to decrypt :param key: 128 bit key, if not specified then will auto generate one on library load at ``settings.cred.aes.key``""" # log ciphertext = base64.b32decode(ciphertext.replace(*"-+").replace(*"_/").encode()); iv = ciphertext[:AES.block_size]; cipher = AES.new(key or k1lib.settings.cred.aes.key, AES.MODE_CBC, iv) # log return unpad(cipher.decrypt(ciphertext[AES.block_size:]), AES.block_size) # log
[docs] def aes_encrypt_json(obj:dict) -> str: # log """Encrypts a Python object using AES. Example:: a = k1.aes_encrypt_json({"a": 3}) k1.aes_decrypt_json(a) # returns {"a": 3} k1.aes_decrypt_json(k1.aes_encrypt_json([1, 2, 3])) # returns [1, 2, 3] k1.aes_decrypt_json(k1.aes_encrypt_json("abc")) # returns "abc" See also: :meth:`aes_encrypt`""" # log return aes_encrypt(json.dumps(obj).encode()) # log
[docs] def aes_decrypt_json(ciphertext:str) -> dict: # log return json.loads(aes_decrypt(ciphertext).decode()) # log
k1lib.settings.cred.add("aes", k1lib.Settings().add("key", get_random_bytes(16), "16-byte aes key, used in aes_encrypt() and aes_decrypt()", sensitive=True), "anything related to AES block cipher") # log except: pass # log k1lib.settings.add("tempObjLifetime", 60, "Default lifetime in seconds used in k1.tempObj()"); # log _tempObjs = {}; _tempTimeouts = {}; _tempObj_autoInc = k1lib.AutoIncrement(prefix="_k1_tempObj_") # log
[docs] def tempObj(x, timeout=None): # tempObj """Stores an object that's meant to exist for a short amount of time, and then will be automatically deleted. Example:: key = k1.tempObj("Suika Ibuki", 10) # stores some string that will only last for 10 seconds k1.tempObj(key) # returns "Suika Ibuki" time.sleep(20) k1.tempObj(key) # returns None The default timeout value is 60 seconds, configurable in :data:`~k1lib.settings`.tempObjLifetime""" # tempObj if isinstance(x, str) and x.startswith("_k1_tempObj_"): return _tempObjs.get(x, None) # tempObj else: # tempObj k = _tempObj_autoInc() # tempObj if timeout is None: timeout = k1lib.settings.tempObjLifetime # tempObj _tempObjs[k] = x; _tempTimeouts[k] = time.time() + timeout; return k # tempObj
def tempCleanupThread(): # tempCleanupThread while True: # tempCleanupThread now = time.time() # tempCleanupThread for k,v in list(_tempTimeouts.items()): # tempCleanupThread if now > v: del _tempObjs[k]; del _tempTimeouts[k] # tempCleanupThread time.sleep(1) # tempCleanupThread threading.Thread(target=tempCleanupThread, daemon=True).start() # tempCleanupThread _time = time.time; _timeSeriesD = {}; _timeSeriesID = {}; _timeSeriesAutoInc = k1.AutoIncrement(prefix="_k1_ts_"); _timeSeriesIdxAutoInc = k1.AutoIncrement() # tempCleanupThread
[docs] class TimeSeries: # TimeSeries
[docs] def __init__(self, name:str=None, fn:str=None, storeRaw:bool=True, retention:int=7*86400, coldStore:bool=False): # TimeSeries """Manages time series data, compresses them, back them up on disk if necessary. Example:: ts1 = k1.TimeSeries(name="ts1") ts1.append(3, 4, 5) # do this anywhere you'd like. This saves 1 data point containing 3 floats to the sqlite database for i in range(600): # deposits 600 samples over 1 minute time span ts1.append(random.random(), random.random(), random.random()) time.sleep(0.1) ts1.getRaw() # returns something like [[1737213223.4139452, (3, 4, 5)], ...] ts1.getRate() # returns something like [(1737213313.0752494, 10.066128035852211), ...] ts1.getPert() # returns something like [[1737213568.9260075, [(0.009, 0.07, 0.47, 0.89, 0.99), (0.001, 0.11, 0.56, 0.90, 0.99), (0.0006, 0.08, 0.46, 0.89, 0.99)]], ...] For :meth:`getRate`, first number is the timestamp, second is the number of data points/second. For :meth:`getPert`, this will return the percentiles of the input data (0%, 10%, 50%, 90%, 100%) for each variable Why does this functionality exists? Well, it's because managing time series usually involves a lot of pain. You need to setup a time series database like Prometheus, or Postgresql to be extra simple. But setting up all that infrastructure takes a lot of effort, and again, if it's hard, you won't do it, or will be incentivised not to do it. So this class is meant to be an object that manages time series data. It manages it in such a way so that you can spam this all over the place and get lots of functionalities right out of the box, without an external server. All data is stored in several tables inside a sqlite file. Each time series gets its own sqlite file. Some performance numbers to keep in mind: - Data write speed: 100k data points/s - Data read speed: 400k data points/s - Disk space used: 50 bytes/data point for small amounts of variables (say ~3) Other features include the ability to auto delete old data so as not to accumulate over time. When old data is deleted, there's also an option to save the deleted data in a separate file for cold storage, so that it's more efficient storage-wise than sqlite, but harder to access. Cold storage space used: 14 + nVars * 4. This is 5x smaller than sqlite for 3 variables There will be scans every 10 seconds on another thread, that compresses the raw data into a usable form. If there's too few data points (<20 data points), then it will skip that scan cycle. Data (refined and raw) will be saved into a sqlite database, stored at the specified file name ``fn``. If no file name is specified, then this will create a temp file and turn that into a sqlite database. For every method, you can also specify an index number:: ts1 = k1.TimeSeries(name="ts1") ts1.appendIdx(2, 3, 4, 5) # index 2 with 3 values (3, 4, 5) ts1.getRaw(idx=2) # returns all data points with idx=2, something like [[1737213223.4139452, (3, 4, 5)], ...] ts1.getPert(idx=2) # returns all percentiles with idx=2, something like [[1737213568.9260075, [(0.009, 0.07, 0.47, 0.89, 0.99), (0.001, 0.11, 0.56, 0.90, 0.99), (0.0006, 0.08, 0.46, 0.89, 0.99)]], ...] This is useful in situations like when you have lots of sensors for different devices. Then each idx can be the device id, and so you can store lots of variables in 1 go for 1 specific device. Then you can query the time series later on for 1 specific device only. You can get all TimeSeries via :meth:`allData`. .. note:: Performance details This is a more detailed section going over what actually happens underneath in case you care about performance. Everything is stored in sqlite. If file name is not given, then it still uses sqlite, but in memory instead. When a new .append() happens, no database interaction happens at all. It's simply just appended to a totally normal, internal list, so <1us. Then there are 2 background threads to help collate and store the data. One is fast (10s scan) and one is slow (60s scan). The fast one distributes new data points to 3 internal stacks, "rawRaw", "rateRaw" and "pertD". If rawRaw has any elements, it will be stored in sqlite. If rateRaw has at least 10 elements, it will calculate the rate and store in sqlite. If pertD (indexed by idx) has at least 100 elements, it will calculate percentiles and store in sqlite. This architecture has several ramifications: * Time taken to execute .append() is very fast * If no .append() are called for a long time, no sqlite queries will be executed * If too little .append() are called, data might not show up in rate and percentile views at all * Might have to wait for at least 10 seconds before .getRaw() has the newest data * If python process exited, there may still be data stored in memory that's not recorded on sqlite yet For the second loop, it grabs all rows from sqlite that was longer than ``retention`` seconds ago, compresses them to an efficient binary format, then appends to the cold storage file. My code is not as bulletproof as sqlite, but still works fine. Because the loop is very slow, it shouldn't affect performance much. :param name: just for cosmetic, to remind you what this does :param fn: sqlite file name to store this time series data. If not specified, then stores database in memory :param storeRaw: whether to store raw data points :param retention: seconds before deleting old data point permanently, default 1 week :param coldStore: if True, when data points past retention time, it will be packed into a single binary file for cold storage""" # TimeSeries if coldStore and fn is None: raise Exception("If using cold storage, has to specify file name!") # TODO: ts1 | aS(repr), ts1 | toHtml() # TimeSeries self._initialized = False; self.name = name or _timeSeriesAutoInc(); self.idx = _timeSeriesIdxAutoInc(); self.fn = fn; self.storeRaw = storeRaw; self.retention = retention; self.coldStore = coldStore; self.dbLock = threading.Lock() # TimeSeries if "/" in self.name: raise Exception("Can't have forward slash in the name") # TimeSeries if self.name in _timeSeriesD: raise Exception(f"Name '{self.name}' has appeared before. Please use a different name") # TimeSeries self._raw = []; self._rawRaw = []; self._rateRaw = []; _timeSeriesD[self.name] = _timeSeriesID[self.idx] = self; self._setupDb(); self._pertD = collections.defaultdict(lambda: []); self._initialized = True # maps idx -> [time, values] # TimeSeries
def _setupDb(self): # TimeSeries fn = self.fn # TimeSeries if fn is not None and os.path.exists(f"{fn}.db"): self._s = s = cli.sql(f"{fn}.db", mode="lite")["default"]; self._dbRaw = s["raw"]; self._dbRate = s["rate"]; self._dbPert = s["pert"] # TimeSeries else: # TimeSeries self._s = s = cli.sql(":memory:" if fn is None else f"{fn}.db", mode="lite")["default"]; s.query("CREATE TABLE rate (id INTEGER PRIMARY KEY AUTOINCREMENT, time INTEGER, rate REAL);"); s.query("CREATE INDEX rate_time ON rate (time);"); # TimeSeries s.query("CREATE TABLE raw (id INTEGER PRIMARY KEY AUTOINCREMENT, time INTEGER, idx INTEGER, data BLOB);"); s.query("CREATE INDEX raw_time ON raw (time);"); s.query("CREATE INDEX raw_idx ON raw (idx);"); # .data is struct.pack("ffff", values) # TimeSeries s.query("CREATE TABLE pert (id INTEGER PRIMARY KEY AUTOINCREMENT, time INTEGER, idx INTEGER, data BLOB);"); s.query("CREATE INDEX pert_time ON pert (time);"); s.query("CREATE INDEX pert_idx ON pert (idx);"); # .data is struct.pack of [*n*[0, 10, 50, 90, 100]] # TimeSeries while s["pert"] is None: print("."); time.sleep(0.1) # TimeSeries self._dbRaw = s["raw"]; self._dbPert = s["pert"]; self._dbRate = s["rate"] # TimeSeries
[docs] @staticmethod # TimeSeries def allData(): return _timeSeriesD # TimeSeries
[docs] @staticmethod # TimeSeries def allIData(): return _timeSeriesID # TimeSeries
[docs] @k1.cache(timeout=10, name="ts_idxs", docs="caches k1.TimeSeries idxs, aka all available idx") # TimeSeries def idxs(self) -> "list[int]": # TimeSeries """Grabs all available idxs""" # TimeSeries if self.storeRaw: return [x[0] for x in self._dbRaw.query("select distinct idx from raw")] # TimeSeries return [x[0] for x in self._dbPert.query("select distinct idx from pert")] # TimeSeries
[docs] def append(self, *values): sig = "f"*len(values); self._raw.append([_time(), 0, values, struct.pack(sig, *values)]); return values # TimeSeries
[docs] def appendIdx(self, idx, *values): sig = "f"*len(values); self._raw.append([_time(), idx, values, struct.pack(sig, *values)]); return values # TimeSeries
[docs] def getRaw(self, startTime:int=None, stopTime:int=None, idx:int=0, limit:int=1000000): # TimeSeries """Grabs raw data of this time series. Returns something like ``[[1737213223.4139452, (3, 4, 5)], ...]`` """ # TimeSeries if not self.storeRaw: raise Exception(".storeRaw is False, so all raw data has been deleted") # TimeSeries s = f"select time, data from raw where idx = {idx}" # TimeSeries if startTime: s += f" and time >= {startTime}" # TimeSeries if stopTime: s += f" and time < {stopTime}" # TimeSeries s += f" order by time limit {limit}"; data = self._dbRaw.query(s) # TimeSeries if len(data) == 0: return [] # TimeSeries s = (len(data[0][1])//4)*"f"; res = [] # TimeSeries try: # fast way, caches "s" computation # TimeSeries for t, vs in data: res.append([t, struct.unpack(s, vs)]) # TimeSeries except: # slow way, in case number of variables are different # TimeSeries for t, vs in data: res.append([t, struct.unpack((len(vs)//4)*"f", vs)]) # TimeSeries return res # TimeSeries
[docs] def getRate(self, startTime:int=None, stopTime:int=None, limit:int=10000): # TimeSeries """Grabs data ingest rate of this time series. Returns something like ``[(1737213313.0752494, 10.066128035852211), ...]``""" # TimeSeries s = f"select time, rate from rate where true" # TimeSeries if startTime: s += f" and time >= {startTime}" # TimeSeries if stopTime: s += f" and time < {stopTime}" # TimeSeries s += f" order by time limit {limit}"; data = self._dbRate.query(s) # TimeSeries return data # TimeSeries
[docs] def getPert(self, startTime:int=None, stopTime:int=None, idx:int=0, limit:int=10000): # TimeSeries """Grabs data percentiles of this time series. Returns something like ``[[1737213568.9260075, [(0.009, 0.07, 0.47, 0.89, 0.99), (0.001, 0.11, 0.56, 0.90, 0.99), (0.0006, 0.08, 0.46, 0.89, 0.99)]], ...]``""" # TimeSeries def _batched(x): return [x[5*i:5*(i+1)] for i in range(len(x)//5)] # TimeSeries s = f"select time, data from pert where idx = {idx}" # TimeSeries if startTime: s += f" and time >= {startTime}" # TimeSeries if stopTime: s += f" and time < {stopTime}" # TimeSeries s += f" order by time limit {limit}"; data = self._dbPert.query(s) # TimeSeries if len(data) == 0: return [] # TimeSeries nvars = len(data[0][1])//4; s = nvars*"f"; res = [] # TimeSeries try: # fast way, caches "s" computation # TimeSeries for t, vs in data: x = struct.unpack(s, vs); res.append([t, [x[5*i:5*(i+1)] for i in range(nvars//5)]]) # TimeSeries except: # slow way, in case number of variables are different between lines # TimeSeries for t, vs in data: x = struct.unpack((len(vs)//4)*"f", vs); res.append([t, [x[5*i:5*(i+1)] for i in range(len(x)//5)]]) # TimeSeries return res # TimeSeries
def _ls(self): return self.getRaw(limit=100) # TimeSeries @k1.cache(timeout=60, maxsize=1000, name="ts_len", docs="caches k1.TimeSeries lengths") # TimeSeries def __len__(self): # TimeSeries if not self.storeRaw: raise Exception(f"TimeSeries '{self.name}'.storeRaw is False, no length available") # TimeSeries res = self._dbRaw.query("""select max(id) from raw limit 1;"""); return res[0][0] if len(res) > 0 else 0 # TimeSeries def __repr__(self): return f"<TimeSeries name='{self.name}' fn='{self.fn}' storeRaw={self.storeRaw} retention={self.retention} coldStore={self.coldStore}>" # TimeSeries
[docs] def plotRaw(self, startTime, endTime, idx=0, dIdx="all", window=1): # TimeSeries import matplotlib.pyplot as plt; s = self; data = s.getRaw(startTime, endTime, idx=idx) | ~cli.apply(lambda x,y: [[x, e] for e in y]) | cli.T() | cli.apply(cli.T() | (cli.apply(cli.window(10) | cli.toMean().all()) if window > 1 else cli.iden())) | cli.deref() # TimeSeries with k1.mplLock: # TimeSeries if len(data) == 0: return "No data available" # TimeSeries if dIdx == "all": data | cli.apply(~cli.aS(plt.dplot, 7, True, ".-")) | cli.ignore() # TimeSeries else: data | cli.rItem(dIdx) | ~cli.aS(plt.dplot, 7, True, ".-") # TimeSeries plt.title(f"Raw '{self.name}', idx {idx}, dIdx {dIdx}, window {window}"); plt.tight_layout(); im = plt.gcf() | cli.toImg() # TimeSeries return im | cli.toHtml() # TimeSeries
[docs] @staticmethod # TimeSeries def splotRaw(name, startTime, endTime, idx=0, dIdx="all", window=1): # TimeSeries d = k1.TimeSeries.allData(); s = d.get(name, None) # TimeSeries if s is None: return f"No TimeSeries with the name '{name}' found" # TimeSeries return s.plotRaw(startTime, endTime, idx=0, dIdx="all", window=1) # TimeSeries
[docs] def plotRate(self, startTime, endTime, window=1): # TimeSeries import matplotlib.pyplot as plt; s = self; data = s.getRate() | (cli.T.wrap(cli.apply(cli.window(window) | cli.toMean().all())) if window > 1 else cli.iden()) | cli.T() | cli.deref() # TimeSeries with k1.mplLock: # TimeSeries if len(data) == 0: return "No data available" # TimeSeries data | ~cli.aS(plt.dplot, 7, True, ".-"); plt.xlabel("Time"); plt.ylabel("Rate (calls/s)"); plt.title(f"Rate '{self.name}', window {window}"); plt.tight_layout(); im = plt.gcf() | cli.toImg() # TimeSeries return im | cli.toHtml() # TimeSeries
[docs] @staticmethod # TimeSeries def splotRate(name, startTime, endTime, window=1): # TimeSeries d = k1.TimeSeries.allData(); s = d.get(name, None) # TimeSeries if s is None: return f"No TimeSeries with the name '{name}' found" # TimeSeries return s.plotRate(startTime, endTime, window=1) # TimeSeries
[docs] def plotPert(self, startTime, stopTime, idx=0, dIdx=0, window=1): # TimeSeries import matplotlib.pyplot as plt; s = self; data = s.getPert(startTime, stopTime, idx) | cli.apply(lambda x: x[dIdx], 1) | ~cli.apply(lambda x,y: [[x, e] for e in y]) | cli.T() | cli.apply(cli.T() | (cli.apply(cli.window(window) | cli.toMean().all()) if window > 1 else cli.iden())) | cli.deref() # TimeSeries with k1.mplLock: # TimeSeries if len(data) == 0: return "No data available" # TimeSeries data | cli.apply(~cli.aS(plt.dplot, 7, True, ".-")) | cli.ignore(); plt.legend(["0%", "10%", "50%", "90%", "100%"]); plt.xlabel("Time"); plt.ylabel("Value"); plt.title(f"Percentile '{self.name}', idx {idx}, dIdx {dIdx}, window {window}"); plt.tight_layout(); im = plt.gcf() | cli.toImg() # TimeSeries return im | cli.toHtml() # TimeSeries
[docs] @staticmethod # TimeSeries def splotPert(name, startTime, stopTime, idx=0, dIdx=0, window=1): # TimeSeries d = k1.TimeSeries.allData(); s = d.get(name, None) # TimeSeries if s is None: return f"No TimeSeries with the name '{name}' found" # TimeSeries return s.plotPert(startTime, stopTime, idx=0, dIdx=0, window=1) # TimeSeries
[docs] @staticmethod # TimeSeries def flask(app, **kwargs): # TimeSeries """Attaches a TimeSeries management plane to a flask app. Example:: app = flask.Flask(__name__) k1.TimeSeries.flask(app) app.run(host="0.0.0.0", port=80) Then, you can access the route "/k1/ts" to see an overview of all TimeSeries :param app: flask app object :param kwargs: extra random kwargs that you want to add to ``app.route()`` function""" # TimeSeries bootstrapJs = """ async function dynamicLoad(selector, endpoint, rawHtml=null) { // loads a remote endpoint containing html and put it to the selected element. If .rawHtml is available, then don't send any request, and just use that html directly const elem = document.querySelector(selector); elem.innerHTML = rawHtml ? rawHtml : (await (await fetch(endpoint)).text()); await new Promise(r => setTimeout(r, 100)); let currentScript = ""; try { for (const script of elem.getElementsByTagName("script")) { currentScript = script.innerHTML; eval(script.innerHTML); } } catch (e) { console.log(`Error encountered: `, e, e.stack, currentScript); } }"""; bootstrapHtml = f""" <head> <meta charset="UTF-8"><title>DHCP low level server</title><meta name="viewport" content="width=device-width, initial-scale=1.0"> <link href="https://static.aigu.vn/daisyui.css" rel="stylesheet" type="text/css" /> <style> h1 {{ font-size: 2.25rem !important; line-height: 2.5rem !important; }} h2 {{ font-size: 1.5rem !important; line-height: 2rem !important; margin: 10px 0px !important; }} h3 {{ font-size: 1.125rem !important; line-height: 1.75rem !important; margin: 6px 0px !important; }} textarea {{ border: 1px solid; padding: 8px 12px !important; border-radius: 10px !important; }} body {{ padding: 12px; }} </style><script>{bootstrapJs}</script> </head>"""; tss = k1.TimeSeries.allData() # TimeSeries # time series stuff # TimeSeries @app.route("/k1/ts", **kwargs) # TimeSeries def ts_index(): # TimeSeries pre = cli.init._jsDAuto(); ui1 = tss.items() | ~cli.apply(lambda k,v: [k, v.fn, v.storeRaw, v.retention, v.coldStore, v | (cli.tryout() | cli.aS(len))]) | cli.deref() | (cli.toJsFunc("term") | cli.grep("${term}") | k1.viz.Table(["name", "fn", "storeRaw", "retention", "coldStore", "#hits"], height=600, onclickFName=f"{pre}_select", selectable=True, sortF=True)) | cli.op().interface() | cli.toHtml() # TimeSeries return f"""{bootstrapHtml}<h1>TimeSeries</h1><div style="overflow-x: auto; margin-top: 12px">{ui1}</div><div id="{pre}_details"></div> <script>\nfunction {pre}_select(row, i, e) {{ dynamicLoad("#{pre}_details", `/k1/ts/${{row[0]}}/fragment`); }}</script>""" # TimeSeries @app.route("/k1/ts/<name>/fragment", **kwargs) # TimeSeries def ts_name(name): # TimeSeries s = tss.get(name, None) # TimeSeries if s is None: return f"No TimeSeries '{name}' found" # TimeSeries idxs = s.idxs(); pre = cli.init._jsDAuto(); ui1 = idxs | (cli.toJsFunc("idx") | cli.grep("${idx}") | cli.batched(10, True) | k1.viz.Table(sortF=True)) | cli.op().interface() | cli.toHtml(); return f""" <h2>TimeSeries '{s.name}'</h2><div style="border: 1px solid black; padding: 8px">{ui1}</div> <div style="display: grid; grid-template-columns: auto auto; row-gap: 8px; column-gap: 8px; align-items: center; margin-top: 12px; width: fit-content"> <div>idx: </div><input id="{pre}_idx" class="input input-bordered" value="0" /> <div>dIdx: </div><input id="{pre}_dIdx" class="input input-bordered" value="0" /> <div>timeStr: </div><input id="{pre}_timeStr" class="input input-bordered" value="1 day" /> <div>window: </div><input id="{pre}_window" class="input input-bordered" value="1" /> </div><button id="{pre}_graphBtn" class="btn" style="margin-top: 8px">Graph</button><div id="{pre}_detailsRaw"></div><div id="{pre}_detailsRate"></div><div id="{pre}_detailsPert"></div> <script> (async () => {{ let dS = (x) => document.querySelector(x); dS("#{pre}_graphBtn").onclick = async () => {{ let idx = dS("#{pre}_idx").value; let dIdx = dS("#{pre}_dIdx").value; let window = dS("#{pre}_window").value; let timeStr = dS("#{pre}_timeStr").value; dynamicLoad("#{pre}_detailsRaw", `/k1/ts/{name}/fragment/${{idx}}/raw/${{dIdx}}/${{window}}/${{timeStr}}`); dynamicLoad("#{pre}_detailsRate", `/k1/ts/{name}/fragment/0/rate/0/${{window}}/${{timeStr}}`); if (dIdx !== "all") dynamicLoad("#{pre}_detailsPert", `/k1/ts/{name}/fragment/${{idx}}/pert/${{dIdx}}/${{window}}/${{timeStr}}`); }} }})(); </script>""" # TimeSeries @app.route("/k1/ts/<name>/fragment/<int:idx>/raw/<dIdx>/<int:window>/<timeStr>", **kwargs) # TimeSeries def ts_raw(name, idx, dIdx, window, timeStr): return k1.TimeSeries.splotRaw(name, *k1.parseTimeStr(timeStr), idx, "all" if dIdx == "all" else int(dIdx), window) # TimeSeries @app.route("/k1/ts/<name>/fragment/0/rate/0/<int:window>/<timeStr>", **kwargs) # TimeSeries def ts_rate(name, window, timeStr): return k1.TimeSeries.splotRate(name, *k1.parseTimeStr(timeStr), window) # TimeSeries @app.route("/k1/ts/<name>/fragment/<int:idx>/pert/<int:dIdx>/<int:window>/<timeStr>", **kwargs) # TimeSeries def ts_pert(name, idx, dIdx, window, timeStr): return k1.TimeSeries.splotPert(name, *k1.parseTimeStr(timeStr), idx, "all" if dIdx == "all" else int(dIdx), window) # TimeSeries
[docs] def dummyLoad(self, niter=600, frac=0.1): # TimeSeries """Simulates a dummy load on this time series. Read source of this method to understand what it does. :param niter: number of iterations :param frac: fraction of new random numbers and extra time to wait. 0 for more deterministic, 1 for more chaotic""" # TimeSeries a = random.random(); b = random.random(); c = random.random() # TimeSeries for i in range(niter) | cli.tee().crt(): # TimeSeries a += (random.random()-0.5)*frac; b += (random.random()-0.5)*frac; c += (random.random()-0.5)*frac # TimeSeries self.append(a, b, c); print(self.getRaw() | cli.shape(0), end=""); time.sleep(0.1 + random.random()*frac) # TimeSeries
@k1.cron(delay=11, daemon=True, delayedStart=5, name="ts_main", docs="k1.TimeSeries fast scan thread") # TimeSeries def _timeSeriesThread(): # _timeSeriesThread for idx, ts in _timeSeriesD.items(): # _timeSeriesThread if not ts._initialized: continue # _timeSeriesThread now = time.time() # raw format: [time, idx, values, pack values] # _timeSeriesThread if len(ts._raw) > 0: # transfer new raw data to other buffers to be processed later # _timeSeriesThread _raw = ts._raw; ts._raw = []; ts._rateRaw.extend(_raw); ts._rawRaw.extend(_raw) # _timeSeriesThread for t,idx,vs,hvs in _raw: ts._pertD[idx].append([t, vs]) # _timeSeriesThread n = len(ts._rateRaw) # _timeSeriesThread if n > 10: # at least 10 data points before collating, or 60 seconds passed and no more data # _timeSeriesThread _rateRaw = ts._rateRaw; ts._rateRaw = [] # _timeSeriesThread _max = max(x[0] for x in _rateRaw); _min = min(x[0] for x in _rateRaw); deltaT = _max - _min # _timeSeriesThread ts._dbRate.insert(time=_min, rate=len(_rateRaw)/deltaT) # _timeSeriesThread for idx, tvs in list(ts._pertD.items()): # _timeSeriesThread if len(tvs) > 100: # _timeSeriesThread ts._pertD[idx] = []; n = len(tvs); data = tvs | cli.cut(1) | cli.T() | cli.sort(None).all() | cli.apply(lambda vs: [vs[0], vs[n//10], vs[n//2], vs[9*n//10], vs[-1]]) | cli.joinSt() | cli.aS(list) # _timeSeriesThread ts._dbPert.insert(time=tvs[0][0], idx=idx, data=struct.pack("f"*len(data), *data)) # _timeSeriesThread if ts.storeRaw and len(ts._rawRaw) > 0: # _timeSeriesThread rr = ts._rawRaw; ts._rawRaw = [] # _timeSeriesThread ts._dbRaw.query(f"""INSERT INTO raw ( time, idx, data ) VALUES """ + ", ".join(f"({t}, {idx}, ?)" for t,idx,vs,hvs in rr), *[hvs for t,idx,vs,hvs in rr]) # _timeSeriesThread @k1.cron(delay=61, daemon=True, delayedStart=11, name="ts_retention", docs="k1.TimeSeries slow scan thread for retention control") # _timeSeriesThread def _timeSeriesRetentionThread(): # scans to see whether there's old outdated data in sqlite, and delete them and store in cold storage # _timeSeriesRetentionThread for idx, ts in _timeSeriesD.items(): # _timeSeriesRetentionThread if not ts._initialized: continue # _timeSeriesRetentionThread now = time.time(); data = ts._dbRaw.query("select time from raw order by time limit 1") # _timeSeriesRetentionThread if len(data) == 0: continue # _timeSeriesRetentionThread beginTime = data[0][0] # _timeSeriesRetentionThread if now - beginTime > ts.retention: # _timeSeriesRetentionThread t = now - ts.retention # timestamp to slice off # _timeSeriesRetentionThread if ts.coldStore: data = ts._dbRaw.query(f"select time, idx, data from raw where time < {t} order by time") # _timeSeriesRetentionThread ts._dbRaw .query(f"delete from raw where time < {t}"); ts._dbPert.query(f"delete from pert where time < {t}"); ts._dbRate.query(f"delete from rate where time < {t}") # _timeSeriesRetentionThread if ts.coldStore: data | ~cli.apply(lambda t,idx,data: struct.pack("di", t, idx) + data) | cli.apply(lambda x: b"\x00" + struct.pack("B", len(x)+2) + x) >> cli.file(f"{ts.fn}.cold") # _timeSeriesRetentionThread _speedAutoInc = k1.AutoIncrement(prefix="_k1_speed_"); _speedData = {} # Dict[idx -> {name, mod, fn, raw, refined}] # _timeSeriesRetentionThread
[docs] class speed(cli.BaseCli): # speed
[docs] def __init__(self, name=None, fn=None, docs=None, coldStore=False): # speed """Tracks and benchmarks certain functions, and monitor them through time with reports in order to deploy them absolutely everywhere. Example:: @k1.speed(name="some func description") def f(x): return x*3 You can get a list of all speed k1.TimeSeries objects via ``k1.TimeSeries.allData`` :param name: optional name to show up in :meth:`allData` :param fn: file name. If specified, will store speed data in sqlite database at this path, else store in memory :param docs: optional docs to show up in :meth:`allData` :param coldStore: if True, stores old speed data in condensed binary file. See more at :class:`TimeSeries`""" # speed self.name = name or _speedAutoInc(); self.fn = fn; self.docs = docs; self.coldStore = coldStore # speed if "/" in self.name: raise Exception("Can't have forward slash in the name") # speed
def __call__(self, f): # speed if self.name in _speedData: raise Exception(f"Name '{self.name}' has appeared before. Please use a different name") # speed _speedData[self.name] = self.obj = {"name": self.name, "docs": self.docs, "func": f, "ts": k1.TimeSeries(name=f"speed: {self.name}", fn=self.fn, coldStore=self.coldStore)} # speed ts = self.obj["ts"]; _time = time.time # speed def wrapper(*args, **kwargs): beginTime = _time(); res = f(*args, **kwargs); duration = _time() - beginTime; ts.append(duration); return res # speed functools.update_wrapper(wrapper, f); return wrapper # speed
[docs] @staticmethod # speed def allData(): return _speedData # speed
[docs] @staticmethod # speed def flask(app, **kwargs): # speed """Attaches a speed management plane to a flask app. Example:: app = flask.Flask(__name__) k1.speed.flask(app) app.run(host="0.0.0.0", port=80) Then, you can access the route "/k1/speed" to see an overview of all speed benchmarks. However, doing ``k1.TimeSeries.flask(app)`` and access at "/k1/ts" would be more beneficial, as that contains all the graphs and data :param app: flask app object :param kwargs: extra random kwargs that you want to add to ``app.route()`` function""" # speed @app.route("/k1/speed", **kwargs) # speed def index(): # speed d = k1.speed.allData(); ui1 = d.items() | ~cli.apply(lambda k,v: [k, v["func"].__name__, inspect.getfile(v["func"]), v["ts"].name, v["docs"]]) | cli.deref() | (cli.toJsFunc("term") | cli.grep("${term}") | k1.viz.Table(["name", "func's name", "func's file name", "TimeSeries name", "docs"], height=600, sortF=True)) | cli.op().interface() | cli.toHtml() # speed return f"""<h1>Speed</h1><div style="overflow-x: auto">{ui1}</div>""" # speed
@contextlib.contextmanager # speed def idenContext(): yield True # idenContext _cextMods = {}; k1.settings.add("cExt", k1.Settings().add("includes", ["fstream", "iostream", "sstream", "mutex", "string", "vector", "cmath", "random"], "header files to include"), "k1.compileCExt()-related settings"); # idenContext
[docs] def compileCExt(cppCode, moduleName, verbose=False): # compileCExt """Conveniently compiles a python C extension module and returns it. Example:: mod = k1.compileCExt(\"\"\" // pure math func, simple data types double func1(double x) { for (int i = 0; i < 1000000; i++) x = std::cos(x); return x; } // takes in array double func2(std::vector<double>& arr) { double sum = 0; for (auto v : arr) sum += v; return sum; } // returns array std::vector<int> func3(int x, int n) { std::vector<int> ans; for (int i = 0; i < n; i++) ans.push_back(x+i); return ans; } // nested arrays std::vector<std::vector<int>> func4(int x, int n) { std::vector<std::vector<int>> ans; std::vector<int> ans1, ans2; for (int i = 0; i < n; i++) ans1.push_back(x+i); for (int i = 0; i < n; i++) ans2.push_back(x+i*2); ans.push_back(ans1); ans.push_back(ans2); return ans; } // complex string manipulation, splitting things like "A,3\\nB,4", std::vector<std::pair<std::string, int>> std::vector<std::pair<std::string, int>> func5(std::string text) { std::vector<std::pair<std::string, int>> ans; std::string line; std::istringstream f(text); std::pair<std::string, int> pair; while (std::getline(f, line)) { int pos = line.find(","); pair.first = line.substr(0, pos); pair.second = std::stoi(line.substr(pos+1)); ans.push_back(pair); } return ans; } PYBIND11_MODULE(genM1, m) { m.def("func1", &func1); m.def("func2", &func2); m.def("func3", &func3); m.def("func4", &func4); m.def("func5", &func5); }\"\"\", "genM1", verbose=True) # this takes around 15s to run. Yes it's slow, but it works # python-equivalent functions def func1(x): for i in range(1000000): x = math.cos(x) return x def func2(arr): return sum(arr) def func3(x, n): return [x+i for i in range(n)] def func4(x, n): return [[x+i for i in range(n)], [x+i*2 for i in range(n)]] def func5(s): return [(x, int(y)) for x,y in [x.split(",") for x in s.split("\\n")]] mod.func1(3) # 22.8 ms ± 1.83 ms, 7.6x faster func1(3) # 174 ms ± 24.1 ms x = list(range(100)) mod.func2(x) # 7.25 μs ± 761 ns, 3.1x slower func2(x) # 2.33 μs ± 299 ns mod.func3(3, 10) # 1.16 μs ± 97 ns, 1.2x slower func3(3, 10) # 946 ns ± 128 ns mod.func4(3, 10) # 2.23 μs ± 188 ns, 1.25x faster func4(3, 10) # 2.78 μs ± 292 ns s = "A,3\\nB,4\\nC,5\\nD,6\\nE,7\\nF,8\\nG,9" mod.func5(s) # 4.5 μs ± 286 ns, 1.07x faster func5(s) # 4.81 μs ± 866 ns Behind the scenes, this function generates a C source file, compiles it into a python C extension module, then loads it in the current interpreter session. So purpose of this is to very quickly drop down to C whenever the need arises. Solutions like Cython is neat and all, but it's quite awkward to code in, and doesn't have the full power of C++. Meanwhile, doing it like this gives you full C++ features, as well as an easy python binding interface via pybind11. Several header files are included by default, so you don't have to include them, like <string>, <fstream>, etc. A list of them are in ``settings.cExt.includes``. You can get a dict of all compiled modules via ``k1.compileCExt.mods()`` Also, as you can see from the tiny benchmark results, it's not always faster to use the C version, if input and output translation operations takes longer than the function itself. So although there's a lot of potential for speedups, you have to be really careful about this, or else you risk slowing it down and wasting a bunch of time. :param cppCode: C++ source code. Common headers are included :param moduleName: name of the module""" # compileCExt # code mostly written by ChatGPT 4o. Verified to work tho # compileCExt import pybind11; from setuptools import setup, Extension; from setuptools.command.build_ext import build_ext; import importlib.util; temp_dir = tempfile.mkdtemp() # compileCExt print(f"temp_dir: {temp_dir}\n" if verbose else "", end=""); incls = k1.settings.cExt.includes | cli.apply(lambda x: f"#include <{x}>") | cli.join("\n") # compileCExt cpp_file = f"""#include <pybind11/pybind11.h>\n#include <pybind11/stl.h>\nnamespace py = pybind11;\n{incls}\n""" + cppCode | cli.file(os.path.join(temp_dir, f"{moduleName}.cpp")) # compileCExt ext_modules = [Extension(moduleName, sources=[cpp_file], include_dirs=[pybind11.get_include()], language="c++", extra_compile_args=["-O3", "-std=c++17"])] # compileCExt class BuildExt(build_ext): # compileCExt def run(self): build_ext.run(self) # compileCExt def build_extension(self, ext): ext_path = self.get_ext_fullpath(ext.name); os.makedirs(os.path.dirname(ext_path), exist_ok=True); build_ext.build_extension(self, ext) # compileCExt with (idenContext() if verbose else k1.captureStdout()): # compileCExt setup(name=moduleName, ext_modules=ext_modules, cmdclass={"build_ext": BuildExt}, script_args=["build_ext", "--inplace"], options={"build_ext": {"build_lib": temp_dir}}); so_file = temp_dir | cli.ls() | cli.grep("cpython") | cli.item() # compileCExt spec = importlib.util.spec_from_file_location(moduleName, so_file); module = importlib.util.module_from_spec(spec); spec.loader.exec_module(module); _cextMods[moduleName] = module; return module # compileCExt
compileCExt.mods = _cextMods # compileCExt