# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""Browser automation tool. This is kinda like selenium, but way more awesome.
How it works is that I've developed a chrome extension that can communicate with
one of my servers, and functions here also communicate with it. After installing
the extension, you can open up a bunch of chrome windows, then using this module,
you can "attach" to a specific window. Then using methods provided here, you can
execute any random pieces of code as it you're in chrome's console.
This works already for some of my projects, but it takes too much time to document
everything, and I have so many other things to do, so if you're interested, ping
me at 157239q@gmail.com and I'll finalize this module. Some examples of what this
can do:
- https://mlexps.com/other/43-touhou/
- https://mlexps.com/other/44-gensokyo-days/
(yes, I'm still addicted to touhou and it's slowly destroying my life)"""
import k1lib, math, numpy as np, random, base64, json, time, asyncio, threading, traceback, html
from collections import defaultdict, deque; import k1lib.cli as cli; import k1lib.kws as kws
__all__ = ["newBrowser", "Browser", "Element", "Locator", "BrowserCancel", "BrowserGroup"]
autoMsgIdx = k1lib.AutoIncrement(prefix=f"_Python_{round(random.random()*1e9)}_{round(time.time())}_")
_browserAutoIdx = k1lib.AutoIncrement(prefix="_browser_"); _browserAnsD = dict(); _browserQueue = deque()
def bThread(): # runs a thread that takes in requests to create new browsers, then spits it out # bThread
loop = asyncio.new_event_loop(); asyncio.set_event_loop(loop) # bThread
async def inner(): # bThread
while True: # bThread
if len(_browserQueue) == 0: await asyncio.sleep(0.01) # bThread
else: idx, _ = _browserQueue.popleft(); _browserAnsD[idx] = Browser() # bThread
loop.run_until_complete(inner()) # bThread
threading.Thread(target=bThread, daemon=True).start() # bThread
k1lib.settings.add("zircon", k1lib.Settings() # bThread
.add("http_server", "https://zircon.mlexps.com") # bThread
.add("ws_server", "wss://ws.zircon.mlexps.com"), # bThread
"from k1lib.zircon module"); # bThread
from urllib.parse import urlparse # bThread
[docs]def newBrowser() -> "Browser": # newBrowser
"""Creates a new browser""" # newBrowser
idx = _browserAutoIdx(); _browserQueue.append([idx, 0]) # newBrowser
while idx not in _browserAnsD: time.sleep(0.01) # newBrowser
ans = _browserAnsD[idx]; del _browserAnsD[idx]; return ans # newBrowser
[docs]class Browser: # controlling over selen.mlexps.com # Browser
def __init__(self): # Browser
self._msgs = {}; self.ws = None; self._ext_ws_updated = False # Browser
self.extId = None; self.__pageInfo = None; self.clientId = None # Browser
self._destroyed = False; self._elements = [] # Browser
asyncio.create_task(self._handshake()) # Browser
[docs] def close(self): self._destroyed = True # Browser
async def _handshake(self): # Browser
async with kws.WsClient(f"{k1lib.settings.zircon.ws_server}/handshake/client") as ws: # Browser
while True: # Browser
asyncio.create_task(ws.send({"type": "connect"})); msg = await ws.recv() # Browser
if msg["type"] == "connect_res": self.clientId = msg["clientId"]; break # Browser
await self._start() # Browser
async def _start(self): # Browser
async with kws.WsClient(f"{k1lib.settings.zircon.ws_server}/connect/client/{self.clientId}") as ws: # Browser
self.ws = ws # Browser
while not self._destroyed: # Browser
msg = await ws.recv() # Browser
if msg["type"] == "ext_ws_updated": self._ext_ws_updated = True; continue # Browser
self._msgs[msg["idx"]] = msg # Browser
[docs] async def send(self, d) -> "res": # always return if successful. If not successful then will throw an error instead! # Browser
if self.ws is None: # connect guard # Browser
print("Connecting to server...") # Browser
while self.ws is None: await asyncio.sleep(0.01) # Browser
print("Connected") # Browser
msgIdx = autoMsgIdx(); await self.ws.send({"src": "Python", "type": "cmd", "idx": msgIdx, **d}); startTime = time.time() # Browser
while msgIdx not in self._msgs: # Browser
await asyncio.sleep(0.01) # resends if return message is not found for quite a while. kws module should have handled # Browser
# this correctly, but there might be a case where websockets module declares that the message has passed through, but # Browser
# it hasn't actually passed through yet. Sounds improbable, but seems like it could happen in some of my tests # Browser
if time.time() - startTime > 20: await self.ws.send({"src": "Python", "type": "cmd", "idx": msgIdx, **d}); startTime = time.time() # Browser
msg = self._msgs[msgIdx]; del self._msgs[msgIdx] # Browser
if "success" in msg and not msg["success"]: raise Exception(msg.get("reason", "Failure on Chrome extension, no reason logged")) # Browser
if "chromeRes" in msg: return msg["chromeRes"] # Browser
return msg # Browser
[docs] async def scan(self, groupPath:"str|list[str]"=None): # Browser
"""Scans for all attached Extensions in the system.
Example::
b = zircon.newBrowser()
await b.scan() # grab metadata about every Extension that's ready
await b.scan("touhou") # grab metadata for Extensions in `touhou` group only
await b.scan(["touhou", "mint2"]) # grab metadata for Extensions in `touhou` or `mint2` groups
The result might look something like this:
.. code-block:: text
{'_ext_254175259_1701581542_5': {'basics': {'title': '',
'url': 'https://zircon.mlexps.com/'},
'tabInfo': {'tabId': 872280718,
'data': {'extId': '_ext_254175259_1701581542_5', 'groupPath': 'touhou'}},
'lastUpdated': 1701584077.7896328,
'lastPing': 1701587171.1521692},
'_ext_254175259_1701581542_11': {'basics': {'title': '',
'url': 'https://zircon.mlexps.com/'},
'tabInfo': {'tabId': 872280722,
'data': {'extId': '_ext_254175259_1701581542_11', 'groupPath': 'touhou'}},
'lastUpdated': 1701583562.7153254,
'lastPing': 1701587171.2784846}}
:param groupPath: (optional) If specified, only returns metadata for Extensions that
have the specified group""" # Browser
res = (await self.send({"cmd": "scan"}))["exts"].items() | cli.filt(lambda x: x.get("tabInfo", None) and x.get("basics", None), 1) # Browser
if groupPath is None: return res | cli.toDict() # Browser
if isinstance(groupPath, str): return res | cli.filt(lambda x: groupPath in x["tabInfo"]["bgData"]["config"]["groupPaths"], 1) | cli.toDict() # Browser
else: # Browser
def f(x): # Browser
gps = set(x["tabInfo"]["bgData"]["config"]["groupPaths"]) # Browser
for gp in groupPath: # Browser
if gp in gps: return True # Browser
return res | cli.filt(f, 1) | cli.toDict() # Browser
[docs] async def pickExt(self, extId:str): self.extId = extId; self.__pageInfo = None; res = await self.send({"cmd": "pickExt", "extId": extId}); return "ok" # Browser
[docs] async def pickExtFromGroup(self, groupName:str): # Browser
return await self.pickExt((await self.scan(groupName)).keys() | cli.item()) # Browser
[docs] async def goto(self, url, timeout=15): # Browser
"""
Typical times for this to be waiting for page change confirmation:
- https://en.touhouwiki.net/wiki/Touhou_Wiki: 4.42s
- https://www.google.com: 5s
- https://mlexps.com: 5.2s
- https://www.amazon.com: 6.2s
- https://www.youtube.com: 9.4s
Quite a distribution. So I figure 15s would be a reasonable middle ground
:param url: url to navigate the page to
:param timeout: will hang until received confirmation that the extension has been
loaded on the new page. If has not received anything after this many seconds,
will return regardless""" # Browser
self._ext_ws_updated = False; res = await self.send({"cmd": "goto", "url": url}) # Browser
# wait until page has reloaded, and extension reconnected, or 10s has passed # Browser
startTime = time.time() # Browser
while not self._ext_ws_updated: # Browser
await asyncio.sleep(0.1) # Browser
if time.time() - startTime > timeout: print(f"{timeout}s up, breaking..."); break # Browser
return res # Browser
def _addE(self, d) -> "Element": return Element(self, {"mode": "localElems", "idx": d["idx"]}, d) # Browser
# return e #self.elements.append(e); return e # was experimenting with notifying elements they're now invalid # Browser
[docs] async def querySelector(self, selector:str) -> "Element": # Browser
return self._addE(await self.send({"cmd": "querySelector", "selector": selector})) # Browser
[docs] async def querySelectorAll(self, selector:str) -> "list[Element]": # Browser
return [self._addE(d) for d in await self.send({"cmd": "querySelectorAll", "selector": selector})] # Browser
[docs] async def window(self) -> "Element": return self._addE(await self.send({"cmd": "window"})) # Browser
[docs] async def document(self) -> "Element": return self._addE(await self.send({"cmd": "document"})); # Browser
[docs] async def locate(self, s:str) -> "list[Element]": # Browser
"""Locates text somewhere and returns plausible Elements""" # Browser
return [self._addE(d) for d in await self.send({"cmd": "locate", "selector": None, "s": s}) if d["tag"].lower() != "body"] # Browser
[docs] async def locate2(self, locator:"Locator", depth:int=20, width:int=100): # Browser
return {k:self._addE(v) for k,v in (await self.send({"cmd": "locate2", "selector": None, "locator": locator.json(), "depth": depth, "width": width})).items()} # Browser
async def _pageInfo(self): # Browser
if self.__pageInfo: return self.__pageInfo # Browser
while True: # Browser
self.__pageInfo = (await self.scan())[self.extId] # Browser
if "basics" in self.__pageInfo and "url" in self.__pageInfo["basics"]: return self.__pageInfo # Browser
asyncio.sleep(0.01) # Browser
def _toLinks(self, f): return k1lib.resolve(self.querySelector("body")) | cli.toLinks(f) # Browser
def __repr__(self): return f"<Browser extId={self.extId}>" # Browser
inf = float("inf") # Browser
[docs]class Element: # Element
[docs] def __init__(self, browser, selector:dict, extras:dict=None): # Element
"""Represents a specific element in the current browser
:param extras: extra metadata, for nice displaying""" # Element
self.browser = browser; self.selector = selector; self.extras = extras # Element
[docs] async def value(self, chain:str): # like elem.value(".innerHTML"), or ".style.backgroundColor" # Element
"""Gets the value of some property of the current element.
Example::
browser = ...
# returns value of `document.querySelector("body").innerHTML`
await browser.querySelector("body").value(".innerHTML")
# returns value of `document.querySelector("h1").style.color`
await browser.querySelector("h1").value(".style.color")
:param chain: resolving chain to the property""" # Element
chain = [e for e in chain.split(".") if e]; return await self.browser.send({"cmd": "elem_value", "selector": self.selector, "chain": chain}) # Element
[docs] async def setValue(self, chain:str, value): # Element
"""Sets value of element's properties
See also: :meth:`value`
:param chain: resolving chain to the property
:param value: anything json-dumpable""" # Element
chain = [e for e in chain.split(".") if e]; return await self.browser.send({"cmd": "elem_setValue", "selector": self.selector, "chain": chain, "value": value}) # Element
[docs] async def func(self, chain:str, args=None): # Element
"""Executes any function on this element.
Example::
browser = ...
await browser.querySelector("#someBtn").func(".click")
:param chain: resolving chain to the function
:param args: tuple of json-dumpable objects""" # Element
chain = [e for e in chain.split(".") if e] # Element
return await self.browser.send({"cmd": "elem_func", "selector": self.selector, "chain": chain, "args": [] if args is None else list(args)}) # Element
[docs] async def inputText(self, value): # Element
"""Input text to this element (assuming input box/text area). Also
dispatches 'input' event to trigger many systems""" # Element
return await self.browser.send({"cmd": "elem_inputText", "selector": self.selector, "value": value}) # Element
[docs] async def parent(self): # Element
"""Grabs the direct parent of this element. Short, sweet and simple""" # Element
return self._addE(await self.browser.send({"cmd": "elem_parent", "selector": self.selector})) # Element
[docs] async def parentC(self, minWidth=0, minHeight=0, deltaX=0, deltaY=0, maxTries=30, takeAfter=True): # Element
"""Grabs a nested parent element of this element that meets the specified conditions.
"parentC" can be thought of as "complex parent". Example::
await e.parent() # most straightforwardly, gets the immediate parent
await e.parentC(minWidth=600, takeAfter=False)
The second line requires some explaining. Let's say that there're these elements: A -> B -> C -> D -> E
Assume E is the current element. Then the second line will recursively grabs parent elements, check if
it's width is at least that width (in this case, say "B"), then returns the element B if takeAfter is True,
else it returns element C.
There're more selectors:
- minWidth: grab parent that's at least this wide
- minHeight: same with minWidth
- deltaY: if positive, grab parent that has y' > y + deltaY (y is the current element's y). If
negative, grab parent that has y' < y + deltaY
- deltaX: same with deltaY
These together with :meth:`childrenC` should help you to navigate around locally.
:param minWidth: if specified, finds smallest parent that is bigger than this
:param maxTries: the number of consecutive parents to try out if minWidth or minHeight is specified
:param takeAfter: if True, take the parent bigger than the constraints, else take the parent just shy of that""" # Element
prevE = self; e = await self.parent(); ogX = self.extras["coords"]["x"]; ogY = self.extras["coords"]["y"] # Element
for i in range(maxTries): # Element
if minWidth and (await e.value("clientWidth")) > minWidth: break # Element
if minHeight and (await e.value("clientHeight")) > minHeight: break # Element
if deltaX: # Element
x = e.extras["coords"]["x"] # Element
if deltaX > 0: # Element
if ogX + deltaX < x: break # Element
else: # Element
if ogX + deltaX > x: break # Element
if deltaY: # Element
y = e.extras["coords"]["y"] # Element
if deltaY > 0: # Element
if ogY + deltaY < y: break # Element
else: # Element
if ogY + deltaY > y: break # Element
prevE = e; e = await e.parent() # Element
return e if takeAfter else prevE # Element
[docs] async def children(self) -> "List[Element]": # Element
"""Grabs all direct children of this element.""" # Element
return [self._addE(e) for e in await self.browser.send({"cmd": "elem_children", "selector": self.selector})] # Element
[docs] async def childrenC(self, minWidth=0, minHeight=0, maxWidth=inf, maxHeight=inf) -> "List[Element]": # Element
"""Recursively grabs all children, and returns all elements that's within the specified bounds.
Note that if A is the parent of B, and both meets the conditions, then only A is returned.""" # Element
chs = await self.children(); ans = [] # Element
for ch in chs: # Element
if "coords" not in ch.extras: continue # Element
c = ch.extras["coords"]; w = c["w"]; h = c["h"] # Element
if w < minWidth or h < minHeight: continue # Element
if w <= maxWidth and h <= maxHeight: ans.append(ch) # Element
else: # Element
for e in await ch.childrenC(minWidth, minHeight, maxWidth, maxHeight): ans.append(e) # Element
return ans # Element
def _addE(self, e): return self.browser._addE(e) # Element
[docs] async def querySelector(self, selector:str) -> "Element": # Element
return self._addE(await self.browser.send({"cmd": "elem_querySelector", "selector": self.selector, "querySelector": selector})) # Element
[docs] async def querySelectorAll(self, selector:str) -> "list[Element]": # Element
return [self._addE(d) for d in await self.browser.send({"cmd": "elem_querySelectorAll", "selector": self.selector, "querySelector": selector})] # Element
[docs] async def snake(self) -> str: # get uniquely identifying css selector (? not sure, long time ago) # Element
return await self.browser.send({"cmd": "snake", "selector": self.selector}) # Element
[docs] async def locate(self, s:str) -> "list[Element]": # Element
ans = []; res = [self._addE(d) for d in await self.browser.send({"cmd": "locate", "selector": self.selector, "s": s})] # Element
for e in res: # Element
if s in e.extras["text"]: ans.append(e) # Element
elif s in await e.value("textContent"): ans.append(e) # Element
return ans # Element
[docs] async def locate2(self, locator:"Locator", depth=20, width=100): # Element
return {k:self._addE(v) for k,v in (await self.browser.send({"cmd": "locate2", "selector": None, "locator": locator.json(), "depth": depth, "width": width})).items()} # Element
def _toLinks(self, f): # Element
url = k1lib.resolve(self.browser._pageInfo())["basics"]["url"]; htm = k1lib.resolve(self.value("innerHTML")) # Element
a = urlparse(url); baseUrl = f"{a.scheme}://{a.netloc}" # Element
return [htm] | cli.toLinks(f) | cli.apply(lambda x: f"{baseUrl}{x}" if x.startswith("/") else x)\
| cli.apply(lambda x: f"{url}{x}" if x.startswith("#") else x) | cli.aS(set) | cli.sort(None, False) | cli.aS(list) # Element
def __repr__(self): # Element
try: # Element
if isinstance(self.selector, str): return f"<Element selector='{self.selector}' browser={self.browser.extId}/>" # Element
elif self.selector["mode"] == "localElems": # Element
d = self.extras; # Element
if "coords" in d: return f"""<Element {d['tag']} id="{d['id']}" #child={d['nChildren']} class="{d['className']}" %parent={round(d['boundedCoords']['areaRatio']['parent']*100)}% %screen={round(d['boundedCoords']['areaRatio']['screen']*100)}% />""" # Element
else: return f"""<Element {d['tag']} id="{d['id']}" #child={d['nChildren']} class="{d['className']}" />""" # Element
except: pass # Element
return f"<Element browser={self.browser.extId}/>" # Element
def _repr_html_(self): # Element
try: # Element
d = self.extras; im = "" # Element
if "coords" in d: # Element
s = 400/d["screen"]["w"]; p5 = k1lib.p5; p5.newSketch(d["screen"]["w"]*s+1, d["screen"]["h"]*s+1, flip=False) # Element
p5.stroke(0, 0, 255); p5.rect(0, 0, d["screen"]["w"]*s, d["screen"]["h"]*s) # Element
p5.stroke(0, 255, 0); p5.rect(d["parent"]["boundedCoords"]["x"]*s, d["parent"]["boundedCoords"]["y"]*s, d["parent"]["boundedCoords"]["w"]*s, d["parent"]["boundedCoords"]["h"]*s) # Element
p5.stroke(255, 0, 0); p5.rect(d["boundedCoords"]["x"]*s, d["boundedCoords"]["y"]*s, d["boundedCoords"]["w"]*s, d["boundedCoords"]["h"]*s) # Element
im = p5.img() | cli.toHtml(); im = f"Location on screen (blue - window, green - parent, red - element):<br>{im}" # Element
text = html.escape("\n".join([e for e in d["text"].split("\n") if e.strip()])) # Element
return f"""<pre>{html.escape(self.__repr__())}</pre>{im}<br>Text content:<pre style='padding: 10px'>{text}</pre>""" # Element
except: pass # Element
return html.escape(self.__repr__()) # Element
[docs]class Locator: # Locator
def __init__(self, name:str, topleft:int, bottomright:int, text:str="", tag:str="", klass:str="", nChildren:int=0): # Locator
self.name = name # Locator
self.topleft = topleft # Locator
self.bottomright = bottomright # Locator
self.text = text # Locator
self.tag = tag # Locator
self.klass = klass # Locator
self.nChildren = nChildren # Locator
self.children = [] # Locator
[docs] def addChild(self, child:"Locator"): # Locator
self.children.append(child) # Locator
[docs] def json(self): return {"name": self.name, "topleft": self.topleft, "bottomright": self.bottomright, "text": self.text, "tag": self.tag, "klass": self.klass, "nChildren": self.nChildren, "children": [c.json() for c in self.children]} # Locator
[docs] @staticmethod # Locator
def fromJson(d): # Locator
children = d["children"]; del d["children"]; loc = Locator(**d) # Locator
for c in children: loc.addChild(Locator.fromJson(c)) # Locator
return loc # Locator
[docs] @staticmethod # Locator
def builder(): # Locator
print("-"*50); name = input("What's the name of this Locator? ") # Locator
print("-"*50); print(range(10**2) | cli.batched(10) | cli.pretty() | cli.join("\n")); print("Where is it roughly on the screen/relative to its parent?"); topleft = input("Top left: "); bottomright = input("Bottom right: ") # Locator
print("-"*50); text = input("Does it contain any text? ") # Locator
print("-"*50); tag = input("What html tag is it? ") # Locator
print("-"*50); klass = input("Does it have any class names? ") # Locator
print("-"*50); nChildren = input("How many children does it expect to have? 0-100: ") # Locator
return Locator(name, int(topleft), int(bottomright), text, tag, klass, nChildren) # Locator
def _plot(self,px1,py1,px2,py2): # Locator
p5 = k1lib.p5; topleft = self.topleft; bottomright = self.bottomright # Locator
top = (topleft//10)/10; bottom = (bottomright//10+1)/10 # Locator
left = (topleft%10)/10; right = (bottomright%10+1)/10 # Locator
x1 = (1-left)*px1 + left*px2; x2 = (1-right) *px1 + right *px2 # Locator
y1 = (1-top) *py1 + top *py2; y2 = (1-bottom)*py1 + bottom*py2 # Locator
p5.rect(x1,y1,x2-x1,y2-y1); p5.text(self.name, (x1+x2)/2, y2-5) # Locator
for c in self.children: c._plot(x1,y1,x2,y2) # Locator
[docs] def plot(self): # Locator
w = 800; h = w/1920*1080; p5 = k1lib.p5 # Locator
p5.newSketch(w, h, False); p5.background(250) # Locator
self._plot(0, 0, w, h); return p5.img() # Locator
k1lib.settings.zircon.add("conflictDuration", 10, "How many seconds does the Extensions need to not take orders from other Python clients before our Python clients can take over? If too high, there won't be any free Extensions left, and if too low, there will be interference with other ppl") # Locator
[docs]class BrowserCancel(Exception): pass # BrowserCancel
[docs]class BrowserGroup: # BrowserGroup
[docs] def __init__(self, groupPath:"str|list[str]", limit:int=3): # BrowserGroup
"""Constructs a browser group.
:param groupPath: what group of browsers do you want to take control over?
:param limit: only take over this many browsers""" # BrowserGroup
self.groupPath = groupPath; self.limit = limit # BrowserGroup
self._scanBrowser = newBrowser(); self._extIds = []; self._browsers = [] # BrowserGroup
self._setupFinished = False; asyncio.create_task(self._setup()) # BrowserGroup
async def _setup(self): # BrowserGroup
with k1lib.captureStdout(): scan = await self._scanBrowser.scan(self.groupPath) # BrowserGroup
extIds = scan.items() | cli.filt(lambda x: (time.time() - x["lastUpdated"]) > k1lib.settings.zircon.conflictDuration, 1) | cli.cut(0) | cli.deref() # BrowserGroup
self._extIds = extIds | cli.head(self.limit) | cli.deref() # BrowserGroup
# then have all the browsers pick the exts # BrowserGroup
for extId in self._extIds: # BrowserGroup
b = newBrowser() # BrowserGroup
with k1lib.captureStdout(): await b.pickExt(extId) # BrowserGroup
self._browsers.append(b) # BrowserGroup
self._setupFinished = True # BrowserGroup
async def _setupGuard(self): # BrowserGroup
while not self._setupFinished: await asyncio.sleep(0.01) # BrowserGroup
[docs] async def execute(self, aFn, timeout=20): # BrowserGroup
"""Executes the specified async function repeatedly whenever a browser frees up.
Example::
linksToVisit = deque([
'https://en.touhouwiki.net/wiki/Reimu_Hakurei',
'https://en.touhouwiki.net/wiki/Marisa_Kirisame',
'https://en.touhouwiki.net/wiki/Touhou_Project',
'https://en.touhouwiki.net/wiki/Imperishable_Night',
'https://en.touhouwiki.net/wiki/Perfect_Cherry_Blossom',
'https://en.touhouwiki.net/wiki/Embodiment_of_Scarlet_Devil',
'https://en.touhouwiki.net/wiki/Subterranean_Animism',
'https://en.touhouwiki.net/wiki/Mountain_of_Faith',
'https://en.touhouwiki.net/wiki/Phantasmagoria_of_Flower_View',
'https://en.touhouwiki.net/wiki/Hakurei_Shrine',
'https://en.touhouwiki.net/wiki/Touhou_Wiki:Projects',
'https://en.touhouwiki.net/wiki/Yukari_Yakumo',
'https://en.touhouwiki.net/wiki/Undefined_Fantastic_Object',
'https://en.touhouwiki.net/wiki/Aya_Shameimaru',
'https://en.touhouwiki.net/wiki/Sakuya_Izayoi',
'https://en.touhouwiki.net/wiki/Immaterial_and_Missing_Power',
'https://en.touhouwiki.net/wiki/Sanae_Kochiya'
])
data = []
async def crawl(b:"zircon.Browser"):
# put here because it seems to resolve lots of problems that I have
# when browser instances are scheduled too close together
await asyncio.sleep(1)
# if it seems like there're no more data to process, then throw zircon.BrowserCancel().
# The current browser will never be scheduled while executing this function again
if len(linksToVisit) == 0: raise zircon.BrowserCancel()
url = linksToVisit.popleft()
try: # do your normal web crawling stuff here
await b.goto(url)
title = await (await b.querySelector("title")).value("innerHTML")
# save data somewhere
data.append([url, title])
except: linksToVisit.append(url) # try again later
bg = zircon.BrowserGroup("public", 5)
# bg = zircon.BrowserGroup(["public", "starcraft"], 5) # or can also be this
await bg.execute(crawl)
The last command will run the crawl function over and over again, as long as there's a free browser
to do it. Also, by default, this will only use inactive browsers (no Python clients are sending them
commands for a while, configurable at ``settings.zircon.conflictDuration``)
Notice how I wrapped all browser interactions inside a try-except block? If some errors were to
appear, like connection lost and the system is trying to restore the connection and you don't
resolve it, .execute() will throw that same error and cancels all current tasks. So if you want
to design something that will run for a long time, catch it and try to schedule the job for later
:param aFn: async function to be executed
:param timeout: if the function takes longer than this amount of time,
then cancel the task and make the browser available in the future
again. Can be None, but I'd advise against that""" # BrowserGroup
await self._setupGuard(); nCancelled = [0]; avaiBs = deque(list(self._browsers)) # BrowserGroup
n = len(self._browsers); errors = [None]; tasks = deque() # BrowserGroup
async def inner(b): # BrowserGroup
try: # BrowserGroup
if timeout is None: await aFn(b); avaiBs.append(b) # BrowserGroup
else: await asyncio.wait_for(aFn(b), timeout); avaiBs.append(b) # BrowserGroup
except BrowserCancel: nCancelled[0] += 1 # BrowserGroup
except asyncio.TimeoutError: avaiBs.append(b) # BrowserGroup
except Exception as e: errors[0] = [e, traceback.format_exc()] # BrowserGroup
# print(f"Encountered error {e} while executing async function. Stopping BrowserGroup.execute(). Traceback:\n\n{traceback.format_exc()}") # BrowserGroup
while True: # BrowserGroup
while len(tasks) and tasks[0].done(): tasks.popleft() # cleaning up old tasks # BrowserGroup
if errors[0]: # trouble! Cancel all current tasks and break # BrowserGroup
for task in tasks: task.cancel() # BrowserGroup
e, tb = errors[0]; raise Exception(f"Exception occured during BrowserGroup.execute(): {e}. Traceback:\n\n{tb}") # BrowserGroup
print(f"\rExecuting. #browsers={n} #running={n-nCancelled[0]} #tasks={len(tasks)} ", end="") # BrowserGroup
try: # BrowserGroup
if len(avaiBs) == 0: # BrowserGroup
if nCancelled[0] >= len(self._browsers): print("Task finished"); break # BrowserGroup
await asyncio.sleep(0.1) # BrowserGroup
else: tasks.append(asyncio.create_task(inner(avaiBs.popleft()))) # BrowserGroup
except asyncio.CancelledError: # BrowserGroup
for task in tasks: task.cancel() # BrowserGroup
break # BrowserGroup