132 lines
3.9 KiB
Python
132 lines
3.9 KiB
Python
|
import re
|
||
|
import requests
|
||
|
|
||
|
# Dirty hack to mute warnings about unverified certificates
|
||
|
def __nothing(a, b): pass
|
||
|
requests.warnings.warn = __nothing
|
||
|
|
||
|
|
||
|
class ParseIsNotDoneException(Exception):
|
||
|
def __init__(self):
|
||
|
super().__init__("you should perform parsing first")
|
||
|
|
||
|
|
||
|
class FailedToParseException(Exception):
|
||
|
def __init__(self, url, errtext):
|
||
|
super().__init__(f"cant parse web site \"{url}\"; error message: {errtext}")
|
||
|
|
||
|
|
||
|
class CantRequestPageException(Exception):
|
||
|
def __init__(self, url, errtext=None):
|
||
|
if errtext:
|
||
|
super().__init__(f"cant request page \"{url}\"; reason: {errtext}")
|
||
|
else:
|
||
|
super().__init__(f"cant request page \"{url}\"")
|
||
|
|
||
|
|
||
|
class WebSiteParser:
|
||
|
def __init__(self, url):
|
||
|
self.EMPTY = str(None)
|
||
|
|
||
|
self.CFBlockText = "Enable JavaScript and cookies to continue".lower()
|
||
|
|
||
|
self.DefaultHeaders = {
|
||
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; rv:102.0) Gecko/20100101 Firefox/102.0",
|
||
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
|
||
|
"Accept-Language": "en-US,en;q=0.5",
|
||
|
"Accept-Encoding": "gzip, deflate, br",
|
||
|
"Connection": "keep-alive",
|
||
|
"Upgrade-Insecure-Requests": "1",
|
||
|
"Sec-Fetch-Dest": "document",
|
||
|
"Sec-Fetch-Mode": "navigate",
|
||
|
#"Sec-Fetch-Site": "none",
|
||
|
"Sec-Fetch-User": "?1"
|
||
|
}
|
||
|
# Single result number format:
|
||
|
# {
|
||
|
# "number": "88005553535",
|
||
|
# "country": "USSR",
|
||
|
# "last_use": "11 milliseconds ago",
|
||
|
# "born": "9 eras ago",
|
||
|
# "url": "https://example.com/number/..."
|
||
|
# }
|
||
|
# Single number internal format:
|
||
|
# {
|
||
|
# "number": "...",
|
||
|
# "country": "...",
|
||
|
# "times_used": 100500,
|
||
|
# "last_use_raw": "log(12451 ^ 76346 mod 54) * 420000 / 146 seconds ago",
|
||
|
# "born_raw": "2 << 2 | 1 eRaS aGo ayo dude"
|
||
|
# "uri_raw": "/numbers/..."
|
||
|
# }
|
||
|
self.Output = {
|
||
|
"heavily_used": [],
|
||
|
"almost_not_used": [],
|
||
|
"not_used": []
|
||
|
}
|
||
|
self.ParseDone = False
|
||
|
self.AlmostNotUsedThreshold = 5 # If number is used less or equal {} times, than it counted as almost not used
|
||
|
self.ErrorsWhileParsing = 0
|
||
|
|
||
|
self.WebSiteURL = url
|
||
|
self.WebSiteFullURL = f"https://{url}/"
|
||
|
|
||
|
|
||
|
def Cut(self, text, l=64):
|
||
|
"""Cut trailing symbols"""
|
||
|
if type(text) != str:
|
||
|
text = str(text)
|
||
|
if len(text) > l:
|
||
|
text = text[:l-1] + "..."
|
||
|
return text
|
||
|
|
||
|
def Log(self, text, src=None):
|
||
|
"""Write text to stdout"""
|
||
|
if not src:
|
||
|
print(f"{self.WebSiteURL} parser: {text}")
|
||
|
else:
|
||
|
print(f"{self.WebSiteURL} parser at {src}: {text}")
|
||
|
|
||
|
|
||
|
def RequestPage(self, location=""):
|
||
|
"""Request page at given location"""
|
||
|
url = f"{self.WebSiteFullURL}{location}"
|
||
|
try:
|
||
|
resp = requests.get(
|
||
|
url,
|
||
|
headers=self.DefaultHeaders,
|
||
|
verify=False
|
||
|
)
|
||
|
except requests.exceptions.ConnectionError:
|
||
|
raise CantRequestPageException(url, "cant connect, retries limit exceeded")
|
||
|
if not (resp.status_code in (200, 302)):
|
||
|
if (resp.status_code == 403) and (self.CFBlockText in resp.text.lower()):
|
||
|
raise CantRequestPageException(url, "blocked by cloudflare")
|
||
|
elif (resp.status_code == 403) and (resp.headers.get("Server") == "cloudflare"):
|
||
|
raise CantRequestPageException(url, "seems like blocked by cloudflare")
|
||
|
raise CantRequestPageException(url, f"status code is {resp.status_code}")
|
||
|
else:
|
||
|
return resp
|
||
|
|
||
|
|
||
|
def ParseAllFromPage(self, exp, to_remove="", location=""):
|
||
|
"""Parse all things which fit regular expression "exp" and remove from them all things that fit RE "to_remove"
|
||
|
"""
|
||
|
markup = self.RequestPage(location).text
|
||
|
peaces_of_markup = re.findall(exp, markup)
|
||
|
result = []
|
||
|
for peace in peaces_of_markup:
|
||
|
if type(peace) != str:
|
||
|
e = self.Cut(str(peace))
|
||
|
self.Log(f"warning: unexpected result while parsing: {e}", f"ParseAllFromPage(self, \"{self.Cut(exp,32)}\", \"{self.Cut(to_remove,32)}\", \"{self.Cut(location,32)}\")")
|
||
|
continue
|
||
|
result.append(
|
||
|
re.sub(to_remove, "", peace) if to_remove else peace
|
||
|
)
|
||
|
return result
|
||
|
|
||
|
|
||
|
def GetResult(self):
|
||
|
if not self.ParseDone:
|
||
|
raise ParseIsNotDoneException
|
||
|
return self.Output
|