Refactored
This commit is contained in:
parent
0e0ece77ea
commit
0f9a46a115
@ -7,14 +7,14 @@ collect information from the access log and put it into the database
|
||||
"""
|
||||
|
||||
re_remote_addr = r"[0-9a-fA-F.:]+"
|
||||
re_remote_visitor = ".*"
|
||||
re_remote_user = ".*"
|
||||
re_time_local = r"\[.+\]"
|
||||
re_request = r'"[^"]+"'
|
||||
re_status = r'\d+'
|
||||
re_body_bytes_sent = r'\d+'
|
||||
re_http_referer = r'"([^"]*)"'
|
||||
re_http_visitor_agent = r'"([^"]*)"'
|
||||
re_log_format: str = f'({re_remote_addr}) - ({re_remote_visitor}) ({re_time_local}) ({re_request}) ({re_status}) ({re_body_bytes_sent}) {re_http_referer} {re_http_visitor_agent}'
|
||||
re_http_user_agent = r'"([^"]*)"'
|
||||
re_log_format: str = f'({re_remote_addr}) - ({re_remote_user}) ({re_time_local}) ({re_request}) ({re_status}) ({re_body_bytes_sent}) {re_http_referer} {re_http_user_agent}'
|
||||
|
||||
def parse_log(logfile_path:str) -> list[Request]:
|
||||
"""
|
||||
@ -23,19 +23,21 @@ def parse_log(logfile_path:str) -> list[Request]:
|
||||
requests = []
|
||||
with open(logfile_path, "r") as file:
|
||||
lines = file.readlines()
|
||||
for line in lines:
|
||||
m = match(re_log_format, line)
|
||||
for i in range(len(lines)):
|
||||
m = match(re_log_format, lines[i])
|
||||
if m is None:
|
||||
warning(f"parse_log: Unmatched line: '{line}'")
|
||||
warning(f"parse_log: Could not match line {i:3}: '{lines[i]}'")
|
||||
continue
|
||||
# print(m.groups())
|
||||
g = m.groups()
|
||||
request_ = m.groups()[3].split(" ")
|
||||
if len(request_) != 3:
|
||||
warning(f"parse_log: len('{m.groups()[3]}'.split(' ')) is {len(request_)} and not 3")
|
||||
pdebug(f"parse_log: line {i:3} match groups:", m.groups(), lvl=4)
|
||||
# _ is user
|
||||
ip_address, _, timestamp, request_, status, bytes_sent, referer, user_agent = m.groups()
|
||||
request_parts = request_.split(" ")
|
||||
if len(request_parts) != 3:
|
||||
warning(f"parse_log: Could not parse request of line {i:3}: '{request_}'")
|
||||
continue
|
||||
requests.append(Request(ip_address=g[0], time_local=g[2],
|
||||
request_type=request_[0], request_route=request_[1], request_protocol=request_[2],
|
||||
status=g[4], bytes_sent=g[5], referer=g[6], user_agent=g[7]))
|
||||
http_function, route, protocol = request_parts
|
||||
requests.append(Request(ip_address=ip_address, time_local=timestamp,
|
||||
request_type=http_function, request_route=route, request_protocol=protocol,
|
||||
status=status, bytes_sent=bytes_sent, referer=referer, user_agent=user_agent))
|
||||
return requests
|
||||
|
||||
|
@ -5,7 +5,7 @@ from datetime import datetime as dt
|
||||
|
||||
from regina.utility.sql_util import sanitize, sql_select, sql_exists, sql_insert, sql_tablesize, sql_max
|
||||
from regina.utility.utility import pdebug, warning, pmessage
|
||||
from regina.utility.globals import visitor_agent_operating_systems, visitor_agent_browsers, settings
|
||||
from regina.utility.globals import user_agent_platforms, user_agent_browsers, settings
|
||||
|
||||
months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aut", "Sep", "Oct", "Nov", "Dec"]
|
||||
|
||||
@ -28,21 +28,21 @@ class Request:
|
||||
warning(f"Request:__init__: {e}")
|
||||
else:
|
||||
warning(f"Request:__init__: Could not match time: '{time_local}'")
|
||||
self.request_type = sanitize(request_type)
|
||||
self.request_route = sanitize(request_route)
|
||||
self.request_protocol = sanitize(request_protocol)
|
||||
self.status = sanitize(status)
|
||||
self.type = sanitize(request_type) # GET, POST, ...
|
||||
self.route = sanitize(request_route) # eg. /index.html
|
||||
self.protocol = sanitize(request_protocol) # eg. HTTP/1.1
|
||||
self.status = sanitize(status) # http status code
|
||||
self.bytes_sent = sanitize(bytes_sent)
|
||||
self.referer = sanitize(referer)
|
||||
self.user_agent = sanitize(user_agent)
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.ip_address} - {self.time_local} - {self.request_route} - {self.user_agent} - {self.status}"
|
||||
return f"{self.ip_address} - {self.time_local} - {self.route} - {self.user_agent} - {self.status}"
|
||||
|
||||
def get_platform(self):
|
||||
# for groups in findall(re_visitor_agent, visitor_agent):
|
||||
operating_system = ""
|
||||
for os in visitor_agent_operating_systems:
|
||||
for os in user_agent_platforms:
|
||||
if os in self.user_agent:
|
||||
operating_system = os
|
||||
break
|
||||
@ -50,7 +50,7 @@ class Request:
|
||||
|
||||
def get_browser(self):
|
||||
browser = ""
|
||||
for br in visitor_agent_browsers:
|
||||
for br in user_agent_browsers:
|
||||
if br in self.user_agent:
|
||||
browser = br
|
||||
break
|
||||
|
@ -1,8 +1,8 @@
|
||||
# from sys import path
|
||||
# print(f"{__file__}: __name__={__name__}, __package__={__package__}, sys.path[0]={path[0]}")
|
||||
from sys import exit
|
||||
from os import path
|
||||
from re import fullmatch
|
||||
from sys import exit, stderr
|
||||
from os import path, makedirs
|
||||
from re import fullmatch, Pattern
|
||||
|
||||
from regina.utility.globals import settings
|
||||
|
||||
@ -10,49 +10,73 @@ from regina.utility.globals import settings
|
||||
Various utitity
|
||||
"""
|
||||
|
||||
def is_whitelisted(val: str, whitelist: str|list[str]|None):
|
||||
def _fullmatch(val, regexp, match_none=True):
|
||||
"""
|
||||
Check if val fully matches regexp
|
||||
Regexp can be:
|
||||
None -> return match_none
|
||||
str
|
||||
re.Pattern
|
||||
list of the above, in which case True if returned if it matches any of the expressions in the list
|
||||
|
||||
"""
|
||||
if not regexp: return match_none
|
||||
if type(regexp) == str:
|
||||
if fullmatch(regexp, val):
|
||||
return True
|
||||
elif type(regexp) == list:
|
||||
for w in regexp:
|
||||
if _fullmatch(val, w):
|
||||
return True
|
||||
elif type(regexp) == Pattern:
|
||||
if not regexp.pattern: # if whitelist = re.compile('')
|
||||
return match_none
|
||||
elif fullmatch(regexp, val):
|
||||
return True
|
||||
else:
|
||||
warning(f"_fullmatch: Unsupported regexp type: {type(regexp)}")
|
||||
return False
|
||||
|
||||
def is_whitelisted(val: str, whitelist: str|Pattern|None|list) -> bool:
|
||||
"""
|
||||
Check if val is in a regex whitelist
|
||||
whitelist: regexp, list of regexp or None
|
||||
whitelist: regexp as str or compiled pattern or None, or a list of them
|
||||
if whitelist is None, always return True
|
||||
"""
|
||||
if not whitelist: return True
|
||||
if type(whitelist) == str:
|
||||
return fullmatch(whitelist, val)
|
||||
if type(whitelist) == list:
|
||||
for w in whitelist:
|
||||
if not fullmatch(w, val): return False
|
||||
return True
|
||||
wl = _fullmatch(val, whitelist)
|
||||
if not wl: pdebug(f"is_whitelisted: value='{val}' is not on whitelist: '{whitelist}'", lvl=4)
|
||||
return wl
|
||||
|
||||
def is_blacklisted(val: str, blacklist: str|list[str]|None):
|
||||
def is_blacklisted(val: str, blacklist: str|Pattern|None|list):
|
||||
"""
|
||||
Check if val is in a regex blacklist
|
||||
blacklist: regexp, list of regexp or None
|
||||
blacklist: regexp as str or compiled pattern or None, or a list of them
|
||||
if blacklist is None, always return False
|
||||
"""
|
||||
return not is_whitelisted(val, blacklist)
|
||||
bl = _fullmatch(val, blacklist, match_none=False)
|
||||
if bl: pdebug(f"is_blacklisted: value='{val}' is blacklisted: '{blacklist}'", lvl=4)
|
||||
return bl
|
||||
|
||||
|
||||
def pdebug(*args, **keys):
|
||||
if settings["debug"]: print(*args, **keys)
|
||||
def pdebug(*args, lvl=2, **keys):
|
||||
if settings["debug"]["debug_level"] >= lvl: print(*args, **keys)
|
||||
|
||||
def warning(*w, **k):
|
||||
print("Warning:", *w, **k)
|
||||
print("Warning:", *w, file=stderr, **k)
|
||||
|
||||
def pmessage(*args, **keys):
|
||||
print(*args, **keys)
|
||||
|
||||
def error(*arg):
|
||||
print("Error:", *arg)
|
||||
exit(1)
|
||||
def error(*args, errno: int=1, **k):
|
||||
print("Error:", *args, file=stderr, **k)
|
||||
exit(errno)
|
||||
|
||||
def missing_arg_val(arg):
|
||||
print("Missing argument for", arg)
|
||||
exit(1)
|
||||
|
||||
def missing_arg(arg):
|
||||
print("Missing ", arg)
|
||||
exit(1)
|
||||
def dict_str(d: dict):
|
||||
"""nicer string for dictionaries"""
|
||||
s = ""
|
||||
for k, v in d.items():
|
||||
s += f"{k}:\t{v}\n"
|
||||
return s.strip("\n")
|
||||
|
||||
|
||||
def get_filepath(filename, directories: list):
|
||||
@ -62,3 +86,9 @@ def get_filepath(filename, directories: list):
|
||||
if path.isfile(p):
|
||||
return p
|
||||
raise FileNotFoundError(f"{filename} not in {directories}")
|
||||
|
||||
def make_parent_dirs(p):
|
||||
parent = path.dirname(p)
|
||||
if not path.isdir(parent):
|
||||
pdebug(f"make_parent_dirs: Making directory '{parent}'", lvl=2)
|
||||
makedirs(parent)
|
||||
|
Loading…
Reference in New Issue
Block a user