Source code for k1lib.zircon

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""Browser automation tool. This is kinda like selenium, but way more awesome.

How it works is that I've developed a chrome extension that can communicate with
one of my servers, and functions here also communicate with it. After installing
the extension, you can open up a bunch of chrome windows, then using this module,
you can "attach" to a specific window. Then using methods provided here, you can
execute any random pieces of code as it you're in chrome's console.

This works already for some of my projects, but it takes too much time to document
everything, and I have so many other things to do, so if you're interested, ping
me at 157239q@gmail.com and I'll finalize this module. Some examples of what this
can do:

- https://mlexps.com/other/43-touhou/
- https://mlexps.com/other/44-gensokyo-days/

(yes, I'm still addicted to touhou and it's slowly destroying my life)"""
import k1lib, math, numpy as np, random, base64, json, time, asyncio, threading, traceback, html
from collections import defaultdict, deque; import k1lib.cli as cli; import k1lib.kws as kws
__all__ = ["newBrowser", "Browser", "Element", "Locator", "BrowserCancel", "BrowserGroup"]
autoMsgIdx = k1lib.AutoIncrement(prefix=f"_Python_{round(random.random()*1e9)}_{round(time.time())}_")
_browserAutoIdx = k1lib.AutoIncrement(prefix="_browser_"); _browserAnsD = dict(); _browserQueue = deque()
def bThread(): # runs a thread that takes in requests to create new browsers, then spits it out # bThread
    loop = asyncio.new_event_loop(); asyncio.set_event_loop(loop)                # bThread
    async def inner():                                                           # bThread
        while True:                                                              # bThread
            if len(_browserQueue) == 0: await asyncio.sleep(0.01)                # bThread
            else: idx, _ = _browserQueue.popleft(); _browserAnsD[idx] = Browser() # bThread
    loop.run_until_complete(inner())                                             # bThread
threading.Thread(target=bThread, daemon=True).start()                            # bThread
k1lib.settings.add("zircon", k1lib.Settings()                                    # bThread
                   .add("http_server", "https://zircon.mlexps.com")              # bThread
                   .add("ws_server", "wss://ws.zircon.mlexps.com"),              # bThread
    "from k1lib.zircon module");                                                 # bThread
from urllib.parse import urlparse                                                # bThread
[docs]def newBrowser() -> "Browser": # newBrowser """Creates a new browser""" # newBrowser idx = _browserAutoIdx(); _browserQueue.append([idx, 0]) # newBrowser while idx not in _browserAnsD: time.sleep(0.01) # newBrowser ans = _browserAnsD[idx]; del _browserAnsD[idx]; return ans # newBrowser
[docs]class Browser: # controlling over selen.mlexps.com # Browser def __init__(self): # Browser self._msgs = {}; self.ws = None; self._ext_ws_updated = False # Browser self.extId = None; self.__pageInfo = None; self.clientId = None # Browser self._destroyed = False; self._elements = [] # Browser asyncio.create_task(self._handshake()) # Browser
[docs] def close(self): self._destroyed = True # Browser
async def _handshake(self): # Browser async with kws.WsClient(f"{k1lib.settings.zircon.ws_server}/handshake/client") as ws: # Browser while True: # Browser asyncio.create_task(ws.send({"type": "connect"})); msg = await ws.recv() # Browser if msg["type"] == "connect_res": self.clientId = msg["clientId"]; break # Browser await self._start() # Browser async def _start(self): # Browser async with kws.WsClient(f"{k1lib.settings.zircon.ws_server}/connect/client/{self.clientId}") as ws: # Browser self.ws = ws # Browser while not self._destroyed: # Browser msg = await ws.recv() # Browser if msg["type"] == "ext_ws_updated": self._ext_ws_updated = True; continue # Browser self._msgs[msg["idx"]] = msg # Browser
[docs] async def send(self, d) -> "res": # always return if successful. If not successful then will throw an error instead! # Browser if self.ws is None: # connect guard # Browser print("Connecting to server...") # Browser while self.ws is None: await asyncio.sleep(0.01) # Browser print("Connected") # Browser msgIdx = autoMsgIdx(); await self.ws.send({"src": "Python", "type": "cmd", "idx": msgIdx, **d}); startTime = time.time() # Browser while msgIdx not in self._msgs: # Browser await asyncio.sleep(0.01) # resends if return message is not found for quite a while. kws module should have handled # Browser # this correctly, but there might be a case where websockets module declares that the message has passed through, but # Browser # it hasn't actually passed through yet. Sounds improbable, but seems like it could happen in some of my tests # Browser if time.time() - startTime > 20: await self.ws.send({"src": "Python", "type": "cmd", "idx": msgIdx, **d}); startTime = time.time() # Browser msg = self._msgs[msgIdx]; del self._msgs[msgIdx] # Browser if "success" in msg and not msg["success"]: raise Exception(msg.get("reason", "Failure on Chrome extension, no reason logged")) # Browser if "chromeRes" in msg: return msg["chromeRes"] # Browser return msg # Browser
[docs] async def scan(self, groupPath:"str|list[str]"=None): # Browser """Scans for all attached Extensions in the system. Example:: b = zircon.newBrowser() await b.scan() # grab metadata about every Extension that's ready await b.scan("touhou") # grab metadata for Extensions in `touhou` group only await b.scan(["touhou", "mint2"]) # grab metadata for Extensions in `touhou` or `mint2` groups The result might look something like this: .. code-block:: text {'_ext_254175259_1701581542_5': {'basics': {'title': '', 'url': 'https://zircon.mlexps.com/'}, 'tabInfo': {'tabId': 872280718, 'data': {'extId': '_ext_254175259_1701581542_5', 'groupPath': 'touhou'}}, 'lastUpdated': 1701584077.7896328, 'lastPing': 1701587171.1521692}, '_ext_254175259_1701581542_11': {'basics': {'title': '', 'url': 'https://zircon.mlexps.com/'}, 'tabInfo': {'tabId': 872280722, 'data': {'extId': '_ext_254175259_1701581542_11', 'groupPath': 'touhou'}}, 'lastUpdated': 1701583562.7153254, 'lastPing': 1701587171.2784846}} :param groupPath: (optional) If specified, only returns metadata for Extensions that have the specified group""" # Browser res = (await self.send({"cmd": "scan"}))["exts"].items() | cli.filt(lambda x: x.get("tabInfo", None) and x.get("basics", None), 1) # Browser if groupPath is None: return res | cli.toDict() # Browser if isinstance(groupPath, str): return res | cli.filt(lambda x: groupPath in x["tabInfo"]["bgData"]["config"]["groupPaths"], 1) | cli.toDict() # Browser else: # Browser def f(x): # Browser gps = set(x["tabInfo"]["bgData"]["config"]["groupPaths"]) # Browser for gp in groupPath: # Browser if gp in gps: return True # Browser return res | cli.filt(f, 1) | cli.toDict() # Browser
[docs] async def pickExt(self, extId:str): self.extId = extId; self.__pageInfo = None; res = await self.send({"cmd": "pickExt", "extId": extId}); return "ok" # Browser
[docs] async def pickExtFromGroup(self, groupName:str): # Browser return await self.pickExt((await self.scan(groupName)).keys() | cli.item()) # Browser
[docs] async def goto(self, url, timeout=15): # Browser """ Typical times for this to be waiting for page change confirmation: - https://en.touhouwiki.net/wiki/Touhou_Wiki: 4.42s - https://www.google.com: 5s - https://mlexps.com: 5.2s - https://www.amazon.com: 6.2s - https://www.youtube.com: 9.4s Quite a distribution. So I figure 15s would be a reasonable middle ground :param url: url to navigate the page to :param timeout: will hang until received confirmation that the extension has been loaded on the new page. If has not received anything after this many seconds, will return regardless""" # Browser self._ext_ws_updated = False; res = await self.send({"cmd": "goto", "url": url}) # Browser # wait until page has reloaded, and extension reconnected, or 10s has passed # Browser startTime = time.time() # Browser while not self._ext_ws_updated: # Browser await asyncio.sleep(0.1) # Browser if time.time() - startTime > timeout: print(f"{timeout}s up, breaking..."); break # Browser return res # Browser
def _addE(self, d) -> "Element": return Element(self, {"mode": "localElems", "idx": d["idx"]}, d) # Browser # return e #self.elements.append(e); return e # was experimenting with notifying elements they're now invalid # Browser
[docs] async def querySelector(self, selector:str) -> "Element": # Browser return self._addE(await self.send({"cmd": "querySelector", "selector": selector})) # Browser
[docs] async def querySelectorAll(self, selector:str) -> "list[Element]": # Browser return [self._addE(d) for d in await self.send({"cmd": "querySelectorAll", "selector": selector})] # Browser
[docs] async def window(self) -> "Element": return self._addE(await self.send({"cmd": "window"})) # Browser
[docs] async def document(self) -> "Element": return self._addE(await self.send({"cmd": "document"})); # Browser
[docs] async def locate(self, s:str) -> "list[Element]": # Browser """Locates text somewhere and returns plausible Elements""" # Browser return [self._addE(d) for d in await self.send({"cmd": "locate", "selector": None, "s": s}) if d["tag"].lower() != "body"] # Browser
[docs] async def locate2(self, locator:"Locator", depth:int=20, width:int=100): # Browser return {k:self._addE(v) for k,v in (await self.send({"cmd": "locate2", "selector": None, "locator": locator.json(), "depth": depth, "width": width})).items()} # Browser
async def _pageInfo(self): # Browser if self.__pageInfo: return self.__pageInfo # Browser while True: # Browser self.__pageInfo = (await self.scan())[self.extId] # Browser if "basics" in self.__pageInfo and "url" in self.__pageInfo["basics"]: return self.__pageInfo # Browser asyncio.sleep(0.01) # Browser
[docs] async def scrollDown(self, timeout=120, step=3000, sleep=5): # Browser """Scrolls <step> pixels down continuously every <sleep> seconds, until can't, or time exceeds <timeout>""" # Browser window = await self.window(); startTime = time.time() # Browser newY = float("-inf") # Browser while time.time() - startTime < timeout: # Browser await window.func("scrollBy", [0, step]); await asyncio.sleep(sleep) # Browser oldY = newY; newY = await window.value("scrollY") # Browser if newY <= oldY: break # Browser
def _toLinks(self, f): return k1lib.resolve(self.querySelector("body")) | cli.toLinks(f) # Browser def __repr__(self): return f"<Browser extId={self.extId}>" # Browser
inf = float("inf") # Browser
[docs]class Element: # Element
[docs] def __init__(self, browser, selector:dict, extras:dict=None): # Element """Represents a specific element in the current browser :param extras: extra metadata, for nice displaying""" # Element self.browser = browser; self.selector = selector; self.extras = extras # Element
[docs] async def value(self, chain:str): # like elem.value(".innerHTML"), or ".style.backgroundColor" # Element """Gets the value of some property of the current element. Example:: browser = ... # returns value of `document.querySelector("body").innerHTML` await browser.querySelector("body").value(".innerHTML") # returns value of `document.querySelector("h1").style.color` await browser.querySelector("h1").value(".style.color") :param chain: resolving chain to the property""" # Element chain = [e for e in chain.split(".") if e]; return await self.browser.send({"cmd": "elem_value", "selector": self.selector, "chain": chain}) # Element
[docs] async def setValue(self, chain:str, value): # Element """Sets value of element's properties See also: :meth:`value` :param chain: resolving chain to the property :param value: anything json-dumpable""" # Element chain = [e for e in chain.split(".") if e]; return await self.browser.send({"cmd": "elem_setValue", "selector": self.selector, "chain": chain, "value": value}) # Element
[docs] async def func(self, chain:str, args=None): # Element """Executes any function on this element. Example:: browser = ... await browser.querySelector("#someBtn").func(".click") :param chain: resolving chain to the function :param args: tuple of json-dumpable objects""" # Element chain = [e for e in chain.split(".") if e] # Element return await self.browser.send({"cmd": "elem_func", "selector": self.selector, "chain": chain, "args": [] if args is None else list(args)}) # Element
[docs] async def inputText(self, value): # Element """Input text to this element (assuming input box/text area). Also dispatches 'input' event to trigger many systems""" # Element return await self.browser.send({"cmd": "elem_inputText", "selector": self.selector, "value": value}) # Element
[docs] async def parent(self): # Element """Grabs the direct parent of this element. Short, sweet and simple""" # Element return self._addE(await self.browser.send({"cmd": "elem_parent", "selector": self.selector})) # Element
[docs] async def parentC(self, minWidth=0, minHeight=0, deltaX=0, deltaY=0, maxTries=30, takeAfter=True): # Element """Grabs a nested parent element of this element that meets the specified conditions. "parentC" can be thought of as "complex parent". Example:: await e.parent() # most straightforwardly, gets the immediate parent await e.parentC(minWidth=600, takeAfter=False) The second line requires some explaining. Let's say that there're these elements: A -> B -> C -> D -> E Assume E is the current element. Then the second line will recursively grabs parent elements, check if it's width is at least that width (in this case, say "B"), then returns the element B if takeAfter is True, else it returns element C. There're more selectors: - minWidth: grab parent that's at least this wide - minHeight: same with minWidth - deltaY: if positive, grab parent that has y' > y + deltaY (y is the current element's y). If negative, grab parent that has y' < y + deltaY - deltaX: same with deltaY These together with :meth:`childrenC` should help you to navigate around locally. :param minWidth: if specified, finds smallest parent that is bigger than this :param maxTries: the number of consecutive parents to try out if minWidth or minHeight is specified :param takeAfter: if True, take the parent bigger than the constraints, else take the parent just shy of that""" # Element prevE = self; e = await self.parent(); ogX = self.extras["coords"]["x"]; ogY = self.extras["coords"]["y"] # Element for i in range(maxTries): # Element if minWidth and (await e.value("clientWidth")) > minWidth: break # Element if minHeight and (await e.value("clientHeight")) > minHeight: break # Element if deltaX: # Element x = e.extras["coords"]["x"] # Element if deltaX > 0: # Element if ogX + deltaX < x: break # Element else: # Element if ogX + deltaX > x: break # Element if deltaY: # Element y = e.extras["coords"]["y"] # Element if deltaY > 0: # Element if ogY + deltaY < y: break # Element else: # Element if ogY + deltaY > y: break # Element prevE = e; e = await e.parent() # Element return e if takeAfter else prevE # Element
[docs] async def children(self) -> "List[Element]": # Element """Grabs all direct children of this element.""" # Element return [self._addE(e) for e in await self.browser.send({"cmd": "elem_children", "selector": self.selector})] # Element
[docs] async def childrenC(self, minWidth=0, minHeight=0, maxWidth=inf, maxHeight=inf) -> "List[Element]": # Element """Recursively grabs all children, and returns all elements that's within the specified bounds. Note that if A is the parent of B, and both meets the conditions, then only A is returned.""" # Element chs = await self.children(); ans = [] # Element for ch in chs: # Element if "coords" not in ch.extras: continue # Element c = ch.extras["coords"]; w = c["w"]; h = c["h"] # Element if w < minWidth or h < minHeight: continue # Element if w <= maxWidth and h <= maxHeight: ans.append(ch) # Element else: # Element for e in await ch.childrenC(minWidth, minHeight, maxWidth, maxHeight): ans.append(e) # Element return ans # Element
def _addE(self, e): return self.browser._addE(e) # Element
[docs] async def querySelector(self, selector:str) -> "Element": # Element return self._addE(await self.browser.send({"cmd": "elem_querySelector", "selector": self.selector, "querySelector": selector})) # Element
[docs] async def querySelectorAll(self, selector:str) -> "list[Element]": # Element return [self._addE(d) for d in await self.browser.send({"cmd": "elem_querySelectorAll", "selector": self.selector, "querySelector": selector})] # Element
[docs] async def snake(self) -> str: # get uniquely identifying css selector (? not sure, long time ago) # Element return await self.browser.send({"cmd": "snake", "selector": self.selector}) # Element
[docs] async def locate(self, s:str) -> "list[Element]": # Element ans = []; res = [self._addE(d) for d in await self.browser.send({"cmd": "locate", "selector": self.selector, "s": s})] # Element for e in res: # Element if s in e.extras["text"]: ans.append(e) # Element elif s in await e.value("textContent"): ans.append(e) # Element return ans # Element
[docs] async def locate2(self, locator:"Locator", depth=20, width=100): # Element return {k:self._addE(v) for k,v in (await self.browser.send({"cmd": "locate2", "selector": None, "locator": locator.json(), "depth": depth, "width": width})).items()} # Element
def _toLinks(self, f): # Element url = k1lib.resolve(self.browser._pageInfo())["basics"]["url"]; htm = k1lib.resolve(self.value("innerHTML")) # Element a = urlparse(url); baseUrl = f"{a.scheme}://{a.netloc}" # Element return [htm] | cli.toLinks(f) | cli.apply(lambda x: f"{baseUrl}{x}" if x.startswith("/") else x)\ | cli.apply(lambda x: f"{url}{x}" if x.startswith("#") else x) | cli.aS(set) | cli.sort(None, False) | cli.aS(list) # Element def __repr__(self): # Element try: # Element if isinstance(self.selector, str): return f"<Element selector='{self.selector}' browser={self.browser.extId}/>" # Element elif self.selector["mode"] == "localElems": # Element d = self.extras; # Element if "coords" in d: return f"""<Element {d['tag']} id="{d['id']}" #child={d['nChildren']} class="{d['className']}" %parent={round(d['boundedCoords']['areaRatio']['parent']*100)}% %screen={round(d['boundedCoords']['areaRatio']['screen']*100)}% />""" # Element else: return f"""<Element {d['tag']} id="{d['id']}" #child={d['nChildren']} class="{d['className']}" />""" # Element except: pass # Element return f"<Element browser={self.browser.extId}/>" # Element def _repr_html_(self): # Element try: # Element d = self.extras; im = "" # Element if "coords" in d: # Element s = 400/d["screen"]["w"]; p5 = k1lib.p5; p5.newSketch(d["screen"]["w"]*s+1, d["screen"]["h"]*s+1, flip=False) # Element p5.stroke(0, 0, 255); p5.rect(0, 0, d["screen"]["w"]*s, d["screen"]["h"]*s) # Element p5.stroke(0, 255, 0); p5.rect(d["parent"]["boundedCoords"]["x"]*s, d["parent"]["boundedCoords"]["y"]*s, d["parent"]["boundedCoords"]["w"]*s, d["parent"]["boundedCoords"]["h"]*s) # Element p5.stroke(255, 0, 0); p5.rect(d["boundedCoords"]["x"]*s, d["boundedCoords"]["y"]*s, d["boundedCoords"]["w"]*s, d["boundedCoords"]["h"]*s) # Element im = p5.img() | cli.toHtml(); im = f"Location on screen (blue - window, green - parent, red - element):<br>{im}" # Element text = html.escape("\n".join([e for e in d["text"].split("\n") if e.strip()])) # Element return f"""<pre>{html.escape(self.__repr__())}</pre>{im}<br>Text content:<pre style='padding: 10px'>{text}</pre>""" # Element except: pass # Element return html.escape(self.__repr__()) # Element
[docs]class Locator: # Locator def __init__(self, name:str, topleft:int, bottomright:int, text:str="", tag:str="", klass:str="", nChildren:int=0): # Locator self.name = name # Locator self.topleft = topleft # Locator self.bottomright = bottomright # Locator self.text = text # Locator self.tag = tag # Locator self.klass = klass # Locator self.nChildren = nChildren # Locator self.children = [] # Locator
[docs] def addChild(self, child:"Locator"): # Locator self.children.append(child) # Locator
[docs] def json(self): return {"name": self.name, "topleft": self.topleft, "bottomright": self.bottomright, "text": self.text, "tag": self.tag, "klass": self.klass, "nChildren": self.nChildren, "children": [c.json() for c in self.children]} # Locator
[docs] @staticmethod # Locator def fromJson(d): # Locator children = d["children"]; del d["children"]; loc = Locator(**d) # Locator for c in children: loc.addChild(Locator.fromJson(c)) # Locator return loc # Locator
[docs] @staticmethod # Locator def builder(): # Locator print("-"*50); name = input("What's the name of this Locator? ") # Locator print("-"*50); print(range(10**2) | cli.batched(10) | cli.pretty() | cli.join("\n")); print("Where is it roughly on the screen/relative to its parent?"); topleft = input("Top left: "); bottomright = input("Bottom right: ") # Locator print("-"*50); text = input("Does it contain any text? ") # Locator print("-"*50); tag = input("What html tag is it? ") # Locator print("-"*50); klass = input("Does it have any class names? ") # Locator print("-"*50); nChildren = input("How many children does it expect to have? 0-100: ") # Locator return Locator(name, int(topleft), int(bottomright), text, tag, klass, nChildren) # Locator
def _plot(self,px1,py1,px2,py2): # Locator p5 = k1lib.p5; topleft = self.topleft; bottomright = self.bottomright # Locator top = (topleft//10)/10; bottom = (bottomright//10+1)/10 # Locator left = (topleft%10)/10; right = (bottomright%10+1)/10 # Locator x1 = (1-left)*px1 + left*px2; x2 = (1-right) *px1 + right *px2 # Locator y1 = (1-top) *py1 + top *py2; y2 = (1-bottom)*py1 + bottom*py2 # Locator p5.rect(x1,y1,x2-x1,y2-y1); p5.text(self.name, (x1+x2)/2, y2-5) # Locator for c in self.children: c._plot(x1,y1,x2,y2) # Locator
[docs] def plot(self): # Locator w = 800; h = w/1920*1080; p5 = k1lib.p5 # Locator p5.newSketch(w, h, False); p5.background(250) # Locator self._plot(0, 0, w, h); return p5.img() # Locator
k1lib.settings.zircon.add("conflictDuration", 10, "How many seconds does the Extensions need to not take orders from other Python clients before our Python clients can take over? If too high, there won't be any free Extensions left, and if too low, there will be interference with other ppl") # Locator
[docs]class BrowserCancel(Exception): pass # BrowserCancel
[docs]class BrowserGroup: # BrowserGroup
[docs] def __init__(self, groupPath:"str|list[str]", limit:int=3): # BrowserGroup """Constructs a browser group. :param groupPath: what group of browsers do you want to take control over? :param limit: only take over this many browsers""" # BrowserGroup self.groupPath = groupPath; self.limit = limit # BrowserGroup self._scanBrowser = newBrowser(); self._extIds = []; self._browsers = [] # BrowserGroup self._setupFinished = False; asyncio.create_task(self._setup()) # BrowserGroup
async def _setup(self): # BrowserGroup with k1lib.captureStdout(): scan = await self._scanBrowser.scan(self.groupPath) # BrowserGroup extIds = scan.items() | cli.filt(lambda x: (time.time() - x["lastUpdated"]) > k1lib.settings.zircon.conflictDuration, 1) | cli.cut(0) | cli.deref() # BrowserGroup self._extIds = extIds | cli.head(self.limit) | cli.deref() # BrowserGroup # then have all the browsers pick the exts # BrowserGroup for extId in self._extIds: # BrowserGroup b = newBrowser() # BrowserGroup with k1lib.captureStdout(): await b.pickExt(extId) # BrowserGroup self._browsers.append(b) # BrowserGroup self._setupFinished = True # BrowserGroup async def _setupGuard(self): # BrowserGroup while not self._setupFinished: await asyncio.sleep(0.01) # BrowserGroup
[docs] async def execute(self, aFn, timeout=20): # BrowserGroup """Executes the specified async function repeatedly whenever a browser frees up. Example:: linksToVisit = deque([ 'https://en.touhouwiki.net/wiki/Reimu_Hakurei', 'https://en.touhouwiki.net/wiki/Marisa_Kirisame', 'https://en.touhouwiki.net/wiki/Touhou_Project', 'https://en.touhouwiki.net/wiki/Imperishable_Night', 'https://en.touhouwiki.net/wiki/Perfect_Cherry_Blossom', 'https://en.touhouwiki.net/wiki/Embodiment_of_Scarlet_Devil', 'https://en.touhouwiki.net/wiki/Subterranean_Animism', 'https://en.touhouwiki.net/wiki/Mountain_of_Faith', 'https://en.touhouwiki.net/wiki/Phantasmagoria_of_Flower_View', 'https://en.touhouwiki.net/wiki/Hakurei_Shrine', 'https://en.touhouwiki.net/wiki/Touhou_Wiki:Projects', 'https://en.touhouwiki.net/wiki/Yukari_Yakumo', 'https://en.touhouwiki.net/wiki/Undefined_Fantastic_Object', 'https://en.touhouwiki.net/wiki/Aya_Shameimaru', 'https://en.touhouwiki.net/wiki/Sakuya_Izayoi', 'https://en.touhouwiki.net/wiki/Immaterial_and_Missing_Power', 'https://en.touhouwiki.net/wiki/Sanae_Kochiya' ]) data = [] async def crawl(b:"zircon.Browser"): # put here because it seems to resolve lots of problems that I have # when browser instances are scheduled too close together await asyncio.sleep(1) # if it seems like there're no more data to process, then throw zircon.BrowserCancel(). # The current browser will never be scheduled while executing this function again if len(linksToVisit) == 0: raise zircon.BrowserCancel() url = linksToVisit.popleft() try: # do your normal web crawling stuff here await b.goto(url) title = await (await b.querySelector("title")).value("innerHTML") # save data somewhere data.append([url, title]) except: linksToVisit.append(url) # try again later bg = zircon.BrowserGroup("public", 5) # bg = zircon.BrowserGroup(["public", "starcraft"], 5) # or can also be this await bg.execute(crawl) The last command will run the crawl function over and over again, as long as there's a free browser to do it. Also, by default, this will only use inactive browsers (no Python clients are sending them commands for a while, configurable at ``settings.zircon.conflictDuration``) Notice how I wrapped all browser interactions inside a try-except block? If some errors were to appear, like connection lost and the system is trying to restore the connection and you don't resolve it, .execute() will throw that same error and cancels all current tasks. So if you want to design something that will run for a long time, catch it and try to schedule the job for later :param aFn: async function to be executed :param timeout: if the function takes longer than this amount of time, then cancel the task and make the browser available in the future again. Can be None, but I'd advise against that""" # BrowserGroup await self._setupGuard(); nCancelled = [0]; avaiBs = deque(list(self._browsers)) # BrowserGroup n = len(self._browsers); errors = [None]; tasks = deque() # BrowserGroup async def inner(b): # BrowserGroup try: # BrowserGroup if timeout is None: await aFn(b); avaiBs.append(b) # BrowserGroup else: await asyncio.wait_for(aFn(b), timeout); avaiBs.append(b) # BrowserGroup except BrowserCancel: nCancelled[0] += 1 # BrowserGroup except asyncio.TimeoutError: avaiBs.append(b) # BrowserGroup except Exception as e: errors[0] = [e, traceback.format_exc()] # BrowserGroup # print(f"Encountered error {e} while executing async function. Stopping BrowserGroup.execute(). Traceback:\n\n{traceback.format_exc()}") # BrowserGroup while True: # BrowserGroup while len(tasks) and tasks[0].done(): tasks.popleft() # cleaning up old tasks # BrowserGroup if errors[0]: # trouble! Cancel all current tasks and break # BrowserGroup for task in tasks: task.cancel() # BrowserGroup e, tb = errors[0]; raise Exception(f"Exception occured during BrowserGroup.execute(): {e}. Traceback:\n\n{tb}") # BrowserGroup print(f"\rExecuting. #browsers={n} #running={n-nCancelled[0]} #tasks={len(tasks)} ", end="") # BrowserGroup try: # BrowserGroup if len(avaiBs) == 0: # BrowserGroup if nCancelled[0] >= len(self._browsers): print("Task finished"); break # BrowserGroup await asyncio.sleep(0.1) # BrowserGroup else: tasks.append(asyncio.create_task(inner(avaiBs.popleft()))) # BrowserGroup except asyncio.CancelledError: # BrowserGroup for task in tasks: task.cancel() # BrowserGroup break # BrowserGroup