changed structure, added cfg
This commit is contained in:
@ -43,6 +43,13 @@ sudo chmod +x /usr/share/zsh/site-functions/_regina
# Changelog
## 2.0
- Refactored databse code
- New database format:
- Removed filegroups table
- Put referrer, browser and platform in own table to reduze size of the database
## 1.0
- Initial release
@ -1,4 +1,5 @@
"""Gather analytics from nginx access logs and visualize them through generated images and a generated html"""
# __package__ = 'regina'
from regina.db_operation import database, visualize, collect
from regina.data_collection import parse_log
from regina import database
@ -1,7 +1,5 @@
import sqlite3 as sql
from re import fullmatch, match
from regina.db_operation.database import t_request, t_visitor, t_file, t_filegroup, t_ip_range, database_tables, get_filegroup, ip_range_id
from regina.utility.sql_util import sanitize, sql_select, sql_exists, sql_insert, sql_tablesize, sql_max
from regina.data_collection.request import Request
from regina.utility.utility import pdebug, warning, pmessage
@ -18,12 +16,12 @@ re_http_referer = r'"([^"]*)"'
re_http_visitor_agent = r'"([^"]*)"'
re_log_format: str = f'({re_remote_addr}) - ({re_remote_visitor}) ({re_time_local}) ({re_request}) ({re_status}) ({re_body_bytes_sent}) {re_http_referer} {re_http_visitor_agent}'
def parse_log(logfile:str) -> list[Request]:
def parse_log(logfile_path:str) -> list[Request]:
create Request objects from each line in the logfile
requests = []
with open(logfile, "r") as file:
with open(logfile_path, "r") as file:
lines = file.readlines()
for line in lines:
m = match(re_log_format, line)
@ -37,7 +35,7 @@ def parse_log(logfile:str) -> list[Request]:
warning(f"parse_log: len('{m.groups()[3]}'.split(' ')) is {len(request_)} and not 3")
requests.append(Request(ip_address=g[0], time_local=g[2],
request_type=request_[0], request_file=request_[1], request_protocol=request_[2],
status=g[4], bytes_sent=g[5], referer=g[6], visitor_agent=g[7]))
request_type=request_[0], request_route=request_[1], request_protocol=request_[2],
status=g[4], bytes_sent=g[5], referer=g[6], user_agent=g[7]))
return requests
@ -3,14 +3,14 @@ from time import mktime
from re import fullmatch, match
from datetime import datetime as dt
from .utility.sql_util import sanitize, sql_select, sql_exists, sql_insert, sql_tablesize, sql_max
from .utility.utility import pdebug, warning, pmessage
from .utility.globals import visitor_agent_operating_systems, visitor_agent_browsers, settings
from regina.utility.sql_util import sanitize, sql_select, sql_exists, sql_insert, sql_tablesize, sql_max
from regina.utility.utility import pdebug, warning, pmessage
from regina.utility.globals import visitor_agent_operating_systems, visitor_agent_browsers, settings
months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aut", "Sep", "Oct", "Nov", "Dec"]
class Request:
def __init__(self, ip_address="", time_local="", request_type="", request_file="", request_protocol="", status="", bytes_sent="", referer="", visitor_agent=""):
def __init__(self, ip_address="", time_local="", request_type="", request_route="", request_protocol="", status="", bytes_sent="", referer="", user_agent=""):
self.ip_address = int(IPv4Address(sanitize(ip_address)))
self.time_local = 0
# turn [20/Nov/2022:00:47:36 +0100] to unix time
@ -29,21 +29,21 @@ class Request:
warning(f"Request:__init__: Could not match time: '{time_local}'")
self.request_type = sanitize(request_type)
self.request_route = sanitize(request_file)
self.request_route = sanitize(request_route)
self.request_protocol = sanitize(request_protocol)
self.status = sanitize(status)
self.bytes_sent = sanitize(bytes_sent)
self.referer = sanitize(referer)
self.visitor_agent = sanitize(visitor_agent)
self.user_agent = sanitize(user_agent)
def __repr__(self):
return f"{self.ip_address} - {self.time_local} - {self.request_route} - {self.visitor_agent} - {self.status}"
return f"{self.ip_address} - {self.time_local} - {self.request_route} - {self.user_agent} - {self.status}"
def get_platform(self):
# for groups in findall(re_visitor_agent, visitor_agent):
operating_system = ""
for os in visitor_agent_operating_systems:
if os in self.visitor_agent:
if os in self.user_agent:
operating_system = os
return operating_system
@ -51,12 +51,12 @@ class Request:
def get_browser(self):
browser = ""
for br in visitor_agent_browsers:
if br in self.visitor_agent:
if br in self.user_agent:
browser = br
return browser
def get_mobile(self):
return "Mobi" in self.visitor_agent
return "Mobi" in self.user_agent
Normal file
Normal file
@ -0,0 +1 @@
"""Visualization utility for regina"""
Normal file
Normal file
@ -0,0 +1,151 @@
from re import fullmatch
from regina.database import Database
from regina.utility.globals import settings
from regina.utility.utility import pdebug, warning, missing_arg, is_blacklisted, is_whitelisted
from regina.data_visualization.utility import is_valid_status, cleanup_referer
def get_route_ranking(db: Database, date_condition:str) -> list[tuple[int, str]]:
:returns [(request_count, route name)]
ranking = []
for (route_id, name) in db(f"SELECT route_id, name FROM route"):
if is_blacklisted(name, settings["route_ranking_blacklist"]): continue
if not is_whitelisted(name, settings["route_ranking_whitelist"]): continue
if settings["route_ranking_ignore_404"]: # use only succesful routes
success = False
for (status) in db(f"SELECT status FROM request WHERE route_id = {route_id}"):
if is_valid_status(status):
pdebug(f"get_route_ranking: success code {status} for route with route_id {route_id} and name {name}")
success = True
if not success:
pdebug(f"get_route_ranking: route with route_id {route_id} and name {name} has only requests resulting in error")
db.execute(f"SELECT COUNT(*) FROM request WHERE route_id = {route_id} AND {date_condition}")
ranking.append((db.fetchone()[0], name))
return ranking
def get_ranking(db: Database, table: str, field_name: str, date_condition:str, whitelist_regex: str|list[str]|None=None, blacklist_regex: str|list[str]|None=None) -> list[tuple[int, str]]:
1) get all the distinct entries for field_name after min_date_unix_time
2) call get_name_function with the distinct entry
3) skip if not fully matching regex whitelist
4) skip if fully matching regex blacklist
5) for every entry, get the count in table after min_date_unix_time
6) sort by count in ascending order
@returns [(count, name)]
ranking = []
for (name) in db(f"SELECT DISTINCT {field_name} FROM {table} WHERE {date_condition}"):
if is_blacklisted(name, blacklist_regex): continue
if not is_whitelisted(name, whitelist_regex): continue
db.execute(f"SELECT COUNT(*) FROM {table} WHERE {field_name} = '{name}' AND {date_condition}")
ranking.append((db.fetchone()[0], name))
return ranking
def cleanup_referer_ranking(referer_ranking: list[tuple[int, str]]):
unique_referers = dict()
for count, referer in referer_ranking:
referer = cleanup_referer(referer)
if referer in unique_referers:
unique_referers[referer] += count
unique_referers[referer] = count
for referer, count in unique_referers.items():
referer_ranking.append((count, referer))
def get_city_and_country_ranking(db: Database, require_humans=True):
@returns [(count, "city (CO)")], [(count, country)]
cities_dict = {}
country_dict = {}
sql_cmd = f"SELECT, co.code, FROM country AS co, city as ci, visitor as v, ip_range as i WHERE v.ip_range_id = i.ip_range_id AND i.city_id = ci.city_id AND ci.country_id = co.country_id"
if require_humans: sql_cmd += " AND v.is_human = 1"
result = db(sql_cmd)
for (city, country_code, country) in result:
if city in cities_dict:
cities_dict[city][0] += 1
if is_blacklisted(city, settings["city_ranking_blacklist"]): continue
if not is_whitelisted(city, settings["city_ranking_whitelist"]): continue
cities_dict[city] = [1, country_code, country] # count, country code
if country in country_dict:
country_dict[country] += 1
if is_blacklisted(country, settings["country_ranking_blacklist"]): continue
if not is_whitelisted(country, settings["country_ranking_whitelist"]): continue
country_dict[country] = 1 # count, country code
city_ranking = [(v[0], f"{city} ({v[1]})") for city,v in cities_dict.items()]
country_ranking = [(count, country) for country,count in country_dict.items()]
return city_ranking, country_ranking
def get_platform_browser_mobile_rankings(db: Database, visitor_ids: list[int]) -> tuple[list[tuple[int, str]], list[tuple[int, str]], float]:
returns [(count, operating_system)], [(count, browser)], mobile_visitor_percentage
platform_ranking = {}
platform_count = 0.0
browser_ranking = {}
browser_count = 0.0
mobile_ranking = { True: 0.0, False: 0.0 }
for visitor_id in visitor_ids:
platform_id, browser_id, is_mobile = db(f"SELECT platform_id, browser_id, is_mobile FROM visitor WHERE visitor_id = {visitor_id}")[0]
is_mobile = bool(is_mobile)
if platform_id:
if platform_id in platform_ranking: platform_ranking[platform_id] += 1
else: platform_ranking[platform_id] = 1
platform_count += 1
if browser_id:
if browser_id in browser_ranking: browser_ranking[browser_id] += 1
else: browser_ranking[browser_id] = 1
browser_count += 1
if (platform_id or browser_id):
mobile_ranking[is_mobile] += 1
mobile_visitor_percentage = mobile_ranking[True] / (mobile_ranking[True] + mobile_ranking[False])
except ZeroDivisionError:
mobile_visitor_percentage = 0.0
platform_ranking = [(c * 100/platform_count, db.get_name("platform", p_id)) for p_id, c in platform_ranking.items()]
browser_ranking = [(c * 100/browser_count, db.get_name("browser", b_id)) for b_id, c in browser_ranking.items()]
return platform_ranking, browser_ranking, mobile_visitor_percentage*100
# Store ranking in results class and dump with pickle
# class Results:
# def __init__(self, timespan_name,
# r_routes: list[tuple[int, str]],
# r_referrers: list[tuple[int, str]],
# r_platforms: list[tuple[int, str]],
# r_browsers: list[tuple[int, str]],
# r_cities: list[tuple[int, str]],
# r_countries: list[tuple[int, str]],
# ):
# self.r_routes = r_routes
# self.r_referrers= r_referrers
# self.r_platforms= r_platforms
# self.r_browsers = r_browsers
# self.r_cities = r_cities
# self.r_countries= r_countries
Normal file
Normal file
@ -0,0 +1,110 @@
from re import fullmatch
from regina.database import Database
from regina.utility.globals import settings
from regina.utility.utility import pdebug, warning, missing_arg
# re_uri_protocol = f"(https?)://"
re_uri_protocol = f"(https?://)?"
re_uri_ipv4 = r"(?:(?:(?:\d{1,3}\.?){4})(?::\d+)?)"
# re_uri_ipv6 = ""
re_uri_domain = r"(?:([^/]+\.)*[^/]+\.[a-zA-Z]{2,})"
re_uri_route = r"(?:/(.*))?"
re_uri_full = f"{re_uri_protocol}({re_uri_domain}|{re_uri_ipv4})({re_uri_route})"
# (https?://)?((?:([^/]+\.)*[^/]+\.[a-zA-Z]{2,})|(?:(?:(?:\d{1,3}\.?){4})(?::\d+)?))((?:/(.*))?)
def cleanup_referer(referer: str) -> str:
split the referer uri into its parts and reassemeble them depending on settings
m = fullmatch(re_uri_full, referer)
if not m:
warning(f"cleanup_referer: Could not match referer '{referer}'")
return referer
# pdebug(f"cleanup_referer: {referer} - {m.groups()}")
protocol = m.groups()[0]
subdomains = m.groups()[2]
if not subdomains: subdomains = ""
domain = m.groups()[1].replace(subdomains, "")
route = m.groups()[3]
referer = domain
if settings["referer_ranking_ignore_tld"]:
if len(domain.split(".")) == 2: # if domain.tld
referer = domain.split(".")[0]
if not settings["referer_ranking_ignore_subdomain"]: referer = subdomains + referer
if not settings["referer_ranking_ignore_protocol"]: referer = protocol + referer
if not settings["referer_ranking_ignore_route"]: referer += route
# pdebug(f"cleanup_referer: cleaned up: {referer}")
return referer
def get_where_date_str(at_date=None, min_date=None, max_date=None):
get a condition string that sets a condition on the time
# dates in unix time
s = ""
if at_date is not None:
if isinstance(at_date, str):
s += f"DATE(time, 'unixepoch') = '{sanitize(at_date)}' AND "
elif isinstance(at_date, int|float):
s += f"time = {int(at_date)} AND "
print(f"WARNING: get_where_date_str: Invalid type of argument at_date: {type(at_date)}")
if min_date is not None:
if isinstance(min_date, str):
s += f"DATE(time, 'unixepoch') >= '{sanitize(min_date)}' AND "
elif isinstance(min_date, int|float):
s += f"time >= {int(min_date)} AND "
print(f"WARNING: get_where_date_str: Invalid type of argument min_date: {type(min_date)}")
if max_date is not None:
if isinstance(max_date, str):
s += f"DATE(time, 'unixepoch') <= '{sanitize(max_date)}' AND "
elif isinstance(max_date, int|float):
s += f"time <= {int(max_date)} AND "
print(f"WARNING: get_where_date_str: Invalid type of argument max_date: {type(max_date)}")
if s == "":
print(f"WARNING: get_where_date_str: no date_str generated. Returning 'time > 0'. at_date={at_date}, min_date={min_date}, max_date={max_date}")
return "time > 0"
return s.removesuffix(" AND ")
def is_valid_status(status: int):
if status >= 400: return False
if settings["status_300_is_success"] and status >= 300: return True
return status < 300
def get_unique_visitor_ids_for_date(db: Database, date:str) -> list[int]:
return [ visitor_id[0] for visitor_id in db(f"SELECT DISTINCT visitor_id FROM request WHERE {date}") ]
def append_human_visitors(db: Database, unique_visitor_ids, unique_visitor_ids_human: list):
for visitor in unique_visitor_ids:
if human -> append to unique_visitor_ids_human
for visitor_id in unique_visitor_ids:
db.execute(f"SELECT is_human FROM visitor WHERE visitor_id = {visitor_id}")
if db.fetchone()[0] == 1:
def get_unique_request_ids_for_date(db: Database, date_constraint:str):
return [ request_id[0] for request_id in db(f"SELECT DISTINCT request_id FROM request WHERE {date_constraint}")]
def append_unique_request_ids_for_date_and_visitor(db: Database, date_constraint:str, visitor_id: int, unique_request_ids_human: list):
"""append all unique requests for visitor_id at date_constraint to unique_request_ids_human"""
for request_id in db(f"SELECT DISTINCT request_id FROM request WHERE {date_constraint} AND visitor_id = {visitor_id}"):
# get number of requests per day
def get_request_count_for_date(db: Database, date_constraint:str) -> int:
db.execute(f"SELECT COUNT(*) FROM request WHERE {date_constraint}")
return db.fetchone()[0]
def get_unique_visitor_count(db: Database) -> int:
return sql_tablesize(db.cur, "visitor")
Normal file
Normal file
@ -0,0 +1,365 @@
# from sys import path
# print(f"{__file__}: __name__={__name__}, __package__={__package__}, sys.path[0]={path[0]}")
import sqlite3 as sql
from sys import exit
from re import fullmatch
import matplotlib.pyplot as plt
from os.path import isdir
from datetime import datetime as dt
from numpy import empty
# local
from regina.database import Database
from regina.utility.sql_util import sanitize, sql_select, sql_exists, sql_insert, sql_tablesize, sql_get_count_where
from regina.utility.utility import pdebug, warning, missing_arg
from regina.utility.globals import settings
from regina.data_visualization.utility import cleanup_referer, get_where_date_str, get_unique_visitor_ids_for_date, get_unique_request_ids_for_date, append_human_visitors, append_unique_request_ids_for_date_and_visitor
from regina.data_visualization.ranking import get_city_and_country_ranking, get_platform_browser_mobile_rankings, get_ranking, cleanup_referer_ranking, get_route_ranking
visualize information from the databse
palette = {
"red": "#ee4035",
"orange": "#f37736",
"yellow": "#fdf458",
"green": "#7bc043",
"blue": "#0392cf",
"purple": "#b044a0",
color_settings_filetypes = {
palette["red"]: ["html", "php"],
palette["green"]: ["jpg", "png", "jpeg", "gif", "svg", "webp"],
palette["yellow"]: ["css"],
"grey": ["txt"]
color_settings_alternate = list(palette.values())
color_settings_browsers = {
palette["red"]: ["Safari"],
palette["orange"]: ["Firefox"],
palette["yellow"]: ["Chrome"],
"grey": ["Edge"],
palette["green"]: ["Chromium"],
palette["purple"]: ["Brave"]
color_settings_platforms = {
palette["red"]: ["Mac"],
palette["green"]: ["Android"],
"grey": ["iPhone", "iPad"],
palette["yellow"]: ["Linux"],
palette["purple"]: ["BSD"],
palette["blue"]: ["Windows"],
def len_list_list(l: list[list]):
size = 0
for i in range(len(l)):
size += len(l[i])
return size
def add_vertikal_labels_in_bar_plot(labels, max_y_val, ax, bar_plot):
Add the label of the bar in or on top of the bar, depending on the bar size
# pdebug("add_vertikal_labels_in_bar_plot:", labels)
for idx,rect in enumerate(bar_plot):
height = rect.get_height()
if height > 0.6 * max_y_val: # if the bar is large, put label in the bar
height = 0.05 * max_y_val
ax.text(rect.get_x() + rect.get_width()/2., height + 0.025 * max_y_val,
ha='center', va='bottom', rotation=90)
def add_labels_at_top_of_bar(xdata, ydata, max_y_val, ax, bar_plot):
add the height of the bar on the top of each bar
# pdebug("add_labels_at_top_of_bar:", xdata, ydata)
y_offset = 0.05 * max_y_val
for idx,rect in enumerate(bar_plot):
ax.text(rect.get_x() + rect.get_width()/2, ydata[idx] - y_offset, round(ydata[idx], 1), ha='center', bbox=dict(facecolor='white', alpha=0.8))
def plot_ranking(ranking: list[tuple[int, str]], fig=None, xlabel="", ylabel="", color_settings:dict|list=[], figsize=None):
make a bar plot of the ranking
# pdebug(f"plot_ranking: ranking={ranking}")
if not fig:
fig = plt.figure(figsize=figsize, dpi=settings["plot_dpi"], linewidth=1.0, frameon=True, subplotpars=None, layout=None)
# create new axis if none is given
ax = fig.add_subplot(xlabel=xlabel, ylabel=ylabel)
# fill x y data
if len(ranking) > settings["file_ranking_plot_max_files"]:
start_index = len(ranking) - settings["file_ranking_plot_max_files"]
else: start_index = 0
x_names = []
y_counts = []
colors = []
for i in range(start_index, len(ranking)):
ft = ranking[i][1].split(".")[-1]
color = palette["blue"]
# if not color_settings: color = palette["blue"]
if isinstance(color_settings, dict):
for key, val in color_settings.items():
if ft in val: color = key
if not color: color = palette["blue"]
elif isinstance(color_settings, list):
# print(color_settings, (i - start_index) % len(color_settings))
color = color_settings[(i - start_index) % len(color_settings)]
bar =, y_counts, tick_label="", color=colors)
if len(y_counts) > 0:
add_vertikal_labels_in_bar_plot(x_names, y_counts[-1], ax, bar)
if settings["plot_add_count_label"]: add_labels_at_top_of_bar(x_names, y_counts, y_counts[-1], ax, bar)
# ax.ylabel(y_counts)
return fig
# def plot(xdata, ydata, fig=None, ax=None, xlabel="", ylabel="", label="", linestyle='-', marker="", color="blue", rotate_xlabel=0):
# if not fig:
# fig = plt.figure(figsize=None, dpi=settings["plot_dpi"], linewidth=1.0, frameon=True, subplotpars=None, layout=None)
# if not ax:
# ax = fig.add_subplot(xlabel=xlabel, ylabel=ylabel)
# else:
# ax = ax.twinx()
# ax.set_ylabel(ylabel)
# # ax.tick_params(axis="y", labelcolor="r")
# ax.plot(xdata, ydata, marker=marker, label=label, linestyle=linestyle, color=color)
# plt.xticks(rotation=rotate_xlabel)
# if label: ax.legend()
# return fig, ax
def plot2y(xdata, ydata1, ydata2, fig=None, ax1=None, ax2=None, plots=None, xlabel="", ylabel1="", ylabel2="", label1="", label2="", linestyle='-', marker="", color1="blue", color2="orange", grid="major", rotate_xlabel=0, figsize=None):
if not fig:
fig = plt.figure(figsize=figsize, dpi=settings["plot_dpi"], linewidth=1.0, frameon=True, subplotpars=None, layout=None)
if not (ax1 and ax2):
ax1 = fig.add_subplot(xlabel=xlabel, ylabel=ylabel1)
ax2 = ax1.twinx()
ax1.tick_params(axis="x", rotation=90)
plot1 = ax1.plot(xdata, ydata1, marker=marker, label=label1, linestyle=linestyle, color=color1)
plot2 = ax2.plot(xdata, ydata2, marker=marker, label=label2, linestyle=linestyle, color=color2)
# ax1.set_xticks(ax1.get_xticks())
# ax1.set_xticklabels(xdata, rotation=rotate_xlabel, rotation_mode="anchor")
# if label1 or label2: ax1.legend()
if plots: plots += plot1 + plot2
else: plots = plot1 + plot2
plt.legend(plots, [ l.get_label() for l in plots])
if grid == "major" or grid == "minor" or grid == "both":
if grid == "minor" or "both":
ax1.grid(visible=True, which=grid, linestyle="-", color="#888")
return fig, ax1, ax2, plots
def visualize(db: Database):
This assumes sanity checks have been done
if not settings["db"]: missing_arg("db")
if not settings["server_name"]: missing_arg("server_name")
img_dir = settings["img_dir"]
pdebug("img_dir:", img_dir)
img_filetype = settings["img_filetype"]
if isdir(img_dir) and img_filetype:
gen_img = True
print(f"Warning: Not generating images since at least one required variable is invalid: img_dir='{img_dir}', img_filetype='{img_filetype}'")
gen_img = False
img_location = settings["img_location"]
names = {
# paths
"img_route_ranking_last_x_days": f"ranking_routes_last_x_days.{img_filetype}",
"img_referer_ranking_last_x_days": f"ranking_referers_last_x_days.{img_filetype}",
"img_countries_last_x_days": f"ranking_countries_last_x_days.{img_filetype}",
"img_cities_last_x_days": f"ranking_cities_last_x_days.{img_filetype}",
"img_browser_ranking_last_x_days": f"ranking_browsers_last_x_days.{img_filetype}",
"img_platform_ranking_last_x_days": f"ranking_platforms_last_x_days.{img_filetype}",
"img_visitors_and_requests_last_x_days": f"visitor_request_count_daily_last_x_days.{img_filetype}",
"img_route_ranking_total": f"ranking_routes_total.{img_filetype}",
"img_referer_ranking_total": f"ranking_referers_total.{img_filetype}",
"img_countries_total": f"ranking_countries_total.{img_filetype}",
"img_cities_total": f"ranking_cities_total.{img_filetype}",
"img_browser_ranking_total": f"ranking_browsers_total.{img_filetype}",
"img_platform_ranking_total": f"ranking_platforms_total.{img_filetype}",
"img_visitors_and_requests_total": f"visitor_request_count_daily_total.{img_filetype}",
# values
"mobile_visitor_percentage_total": 0.0,
"mobile_visitor_percentage_last_x_days": 0.0,
"visitor_count_last_x_days": 0,
"visitor_count_total": 0,
"request_count_last_x_days": 0,
"request_count_total": 0,
"human_visitor_percentage_last_x_days": 0.0,
"human_visitor_percentage_total": 0.0,
"human_request_percentage_last_x_days": 0.0,
"human_request_percentage_total": 0.0,
# general
"regina_version": settings["version"],
"server_name": settings["server_name"],
"last_x_days": settings["last_x_days"], # must be after all the things with last_x_days!
"earliest_date": "1990-1-1",
"generation_date": "1990-1-1 0:0:0",
db = Database(database_path=settings["db"])
get_humans = settings["get_human_percentage"]
# pdebug(f"visualize: settings {settings}")
earliest_date = db.get_earliest_date()
names["earliest_date"] = dt.fromtimestamp(earliest_date).strftime("%Y-%m-%d")
names["generation_date"] ="%Y-%m-%d %H:%M:%S")
# last_x_days_min_date: latest_date - last_x_days
secs_per_day = 86400
last_x_days_min_date = db.get_latest_date() - settings["last_x_days"] * secs_per_day
last_x_days_constraint = get_where_date_str(min_date=last_x_days_min_date)
last_x_days = db.get_days_where(last_x_days_constraint)
last_x_days_contraints = [get_where_date_str(at_date=day) for day in last_x_days]
all_time_constraint = get_where_date_str(min_date=0)
# all months in yyyy-mm format
months_all_time = db.get_months_where(all_time_constraint)
# sqlite constrict to month string
months_strs = []
for year_month in months_all_time:
year, month = year_month.split("-")
# first day of the month
min_date = dt(int(year), int(month), 1).timestamp()
month = (int(month) % 12) + 1 # + 1 month
year = int(year)
if month == 1: year += 1
# first day of the next month - 1 sec
max_date = dt(year, month, 1).timestamp() - 1
months_strs.append(get_where_date_str(min_date=min_date, max_date=max_date))
for i in range(2):
suffix = ["_total", "_last_x_days"][i]
date_constraint = [all_time_constraint, last_x_days_constraint][i]
date_names = [months_all_time, last_x_days][i]
date_constraints = [months_strs, last_x_days_contraints][i]
assert(len(date_names) == len(date_constraints))
# TODO handle groups
file_ranking = get_route_ranking(db, date_constraint)
if gen_img:
fig_file_ranking = plot_ranking(file_ranking, xlabel="Route Name", ylabel="Number of requests", color_settings=color_settings_filetypes, figsize=settings["plot_size_broad"])
fig_file_ranking.savefig(f"{img_dir}/{names[f'img_route_ranking{suffix}']}", bbox_inches="tight")
referer_ranking = get_ranking(db, "request", "referer", date_constraint, settings["referer_ranking_whitelist"], settings["referer_ranking_whitelist"])
pdebug("Referer ranking", referer_ranking)
if gen_img:
fig_referer_ranking = plot_ranking(referer_ranking, xlabel="HTTP Referer", ylabel="Number of requests", color_settings=color_settings_alternate, figsize=settings["plot_size_broad"])
fig_referer_ranking.savefig(f"{img_dir}/{names[f'img_referer_ranking{suffix}']}", bbox_inches="tight")
if settings["do_geoip_rankings"]:
city_ranking, country_ranking = get_city_and_country_ranking(db, require_humans=settings["geoip_only_humans"])
pdebug("Country ranking:", country_ranking)
pdebug("City ranking:", city_ranking)
if gen_img:
fig_referer_ranking = plot_ranking(country_ranking, xlabel="Country", ylabel="Number of visitors", color_settings=color_settings_alternate, figsize=settings["plot_size_broad"])
fig_referer_ranking.savefig(f"{img_dir}/{names[f'img_countries{suffix}']}", bbox_inches="tight")
fig_referer_ranking = plot_ranking(city_ranking, xlabel="City", ylabel="Number of visitors", color_settings=color_settings_alternate, figsize=settings["plot_size_broad"])
fig_referer_ranking.savefig(f"{img_dir}/{names[f'img_cities{suffix}']}", bbox_inches="tight")
# visitor_agent_ranking = get_visitor_agent_ranking(cur, date_str)
# for the time span
unique_visitor_ids = get_unique_visitor_ids_for_date(db, date_constraint)
unique_visitor_ids_human = []
append_human_visitors(db, unique_visitor_ids, unique_visitor_ids_human)
# for each date
date_count = len(date_constraints)
unique_visitor_ids_dates: list[list[int]] = []
unique_request_ids_dates: list[list[int]] = []
unique_visitor_ids_human_dates: list[list[int]] = [[] for _ in range(date_count)]
unique_request_ids_human_dates: list[list[int]] = [[] for _ in range(date_count)]
for i in range(date_count):
date_constraint_ = date_constraints[i]
unique_visitor_ids_dates.append(get_unique_visitor_ids_for_date(db, date_constraint_))
unique_request_ids_dates.append(get_unique_request_ids_for_date(db, date_constraint_))
if get_humans:
# empty_list = []
# unique_visitor_ids_human_dates.append(empty_list)
append_human_visitors(db, unique_visitor_ids_dates[i], unique_visitor_ids_human_dates[i])
# unique_request_ids_human_dates.append(list())
for human in unique_visitor_ids_human_dates[i]:
append_unique_request_ids_for_date_and_visitor(db, date_constraint_, human, unique_request_ids_human_dates[i])
# print("\n\tuu", unique_visitor_ids_dates, "\n\tur",unique_request_ids_dates, "\n\tuuh", unique_visitor_ids_human_dates, "\n\turh", unique_request_ids_human_dates)
# pdebug("uui", unique_visitor_ids)
# pdebug("uuih", unique_visitor_ids_human)
# pdebug("uuid", unique_visitor_ids_dates)
# pdebug("uuidh", unique_visitor_ids_human_dates)
# pdebug("urid", unique_request_ids_dates)
# pdebug("uridh", unique_visitor_ids_human_dates)
# pdebug(f"human_visitor_precentage: len_list_list(visitor_ids)={len_list_list(unique_visitor_ids_dates)}, len_list_list(visitor_ids_human)={len_list_list(unique_visitor_ids_human_dates)}")
if get_humans:
names[f"human_visitor_percentage{suffix}"] = round(100 * len_list_list(unique_visitor_ids_human_dates) / len_list_list(unique_visitor_ids_dates), 2)
names[f"human_visitor_percentage{suffix}"] = -1.0
names[f"human_request_percentage{suffix}"] = round(100 * len_list_list(unique_request_ids_human_dates) / len_list_list(unique_request_ids_dates), 2)
names[f"human_request_percentage{suffix}"] = -1.0
names[f"visitor_count{suffix}"] = len_list_list(unique_visitor_ids_dates)
names[f"request_count{suffix}"] = len_list_list(unique_request_ids_dates)
if gen_img:
fig_daily, ax1, ax2, plots = plot2y(date_names, [len(visitor_ids) for visitor_ids in unique_visitor_ids_dates], [len(request_ids) for request_ids in unique_request_ids_dates], xlabel="Date", ylabel1="Visitor count", label1="Unique visitors", ylabel2="Request count", label2="Unique requests", color1=palette["red"], color2=palette["blue"], rotate_xlabel=-45, figsize=settings["plot_size_broad"])
if get_humans:
fig_daily, ax1, ax2, plots = plot2y(date_names, [len(visitor_ids) for visitor_ids in unique_visitor_ids_human_dates], [len(request_ids) for request_ids in unique_request_ids_human_dates], label1="Unique visitors (human)", label2="Unique requests (human)", color1=palette["orange"], color2=palette["green"], fig=fig_daily, ax1=ax1, ax2=ax2, plots=plots, rotate_xlabel=-45, figsize=settings["plot_size_broad"])
fig_daily.savefig(f"{img_dir}/{names[f'img_visitors_and_requests{suffix}']}", bbox_inches="tight")
# os & browser
platform_ranking, browser_ranking, names[f"mobile_visitor_percentage{suffix}"] = get_platform_browser_mobile_rankings(db, unique_visitor_ids_human)
if gen_img:
fig_os_rating = plot_ranking(platform_ranking, xlabel="Platform", ylabel="Share [%]", color_settings=color_settings_platforms, figsize=settings["plot_size_narrow"])
fig_os_rating.savefig(f"{img_dir}/{names[f'img_platform_ranking{suffix}']}", bbox_inches="tight")
fig_browser_rating = plot_ranking(browser_ranking, xlabel="Browser", ylabel="Share [%]", color_settings=color_settings_browsers, figsize=settings["plot_size_narrow"])
fig_browser_rating.savefig(f"{img_dir}/{names[f'img_browser_ranking{suffix}']}", bbox_inches="tight")
# print("OS ranking", os_ranking)
# print("Browser ranking", browser_ranking)
# print("Mobile percentage", names["mobile_visitor_percentage"])
if settings["template_html"] and settings["html_out_path"]:
pdebug(f"visualize: writing to html: {settings['html_out_path']}")
with open(settings["template_html"], "r") as file:
html =
for name, value in names.items():
if "img" in name:
value = f"{img_location}/{value}"
if type(value) == float:
value = f"{value:.2f}"
html = html.replace(f"%{name}", str(value))
with open(settings["html_out_path"], "w") as file:
warning(f"Skipping html generation because either template_html or html_out_path is invalid: template_html='{settings['template_html']}', html_out_path='{settings['html_out_path']}'")
@ -12,15 +12,14 @@ if __name__ == "__main__": # make relative imports work as described here: http
import sys
from os import path
filepath = path.realpath(path.abspath(__file__))
sys.path.insert(0, path.dirname(path.dirname(path.dirname(filepath))))
sys.path.insert(0, path.dirname(path.dirname(filepath)))
# local
from .utility.sql_util import replace_null, sanitize, sql_select, sql_exists
from .utility.utility import pdebug, get_filepath, warning, pmessage
from .utility.globals import settings
from .db_operation.request import Request
from .utility.globals import visitor_agent_operating_systems, visitor_agent_browsers, settings
from regina.utility.sql_util import replace_null, sanitize, sql_select, sql_exists
from regina.utility.utility import pdebug, get_filepath, warning, pmessage, is_blacklisted, is_whitelisted
from regina.utility.globals import settings
from regina.data_collection.request import Request
from regina.utility.globals import visitor_agent_operating_systems, visitor_agent_browsers, settings
create reginas database as shown in the uml diagram database.uxf
@ -36,13 +35,17 @@ class Database:
pdebug(f"Database.__init__: Creating database at {database_path}")
with open(pkg_resources.resource_filename("regina", "sql/create_db.sql"), "r") as file:
create_db =
def __call__(self, s):
"""execute a command and return fetchall()"""
return self.cur.fetchall()
def execute(self, s):
def fetchone(self):
return self.cur.fetchone()
@ -160,9 +163,10 @@ class Database:
def add_requests(self, requests: list[Request]):
added_requests = 0
# check the new visitors later
request_blacklist = settings["request_location_regex_blacklist"]
new_visitors = []
for i in range(len(requests)):
if is_blacklisted(requests[i].request_route, settings["request_route_blacklist"]): continue
if not is_whitelisted(requests[i].request_route, settings["request_route_whitelist"]): continue
visitor = self.add_request(requests[i])
if visitor:
@ -267,12 +271,15 @@ class Database:
assert(type(city_id_val) == int)
return city_id_val
def update_geoip_tables(self, geoip_city_csv_path: str):
update the geoip data with the contents of the geoip_city_csv file
Make sure to update the visitor.ip_range_id column for all visitors.
In case something changed, they might point to a different city. (won't fix)
In case something changed, they might point to a different city.
TODO: update teh visitor.ip_range_id column to match (potentially) new city ip range
# indices for the csv
FROM = 0; TO = 1; CODE = 2; COUNTRY = 3; REGION = 4; CITY = 5
@ -331,5 +338,43 @@ class Database:
if combine_range_country_id >= 0: # last range , append
add_range(combine_range_low, combine_range_high, f"City in {combine_range_country_name}", f"Region in {combine_range_country_name}", combine_range_country_id)
def get_earliest_date(self) -> int:
"""return the earliest time as unixepoch"""
date = self(f"SELECT MIN(time) FROM request")[0][0]
if not isinstance(date, int): return 0
else: return date
def get_latest_date(self) -> int:
"""return the latest time as unixepoch"""
date = self(f"SELECT MAX(time) FROM request")[0][0]
if not isinstance(date, int): return 0
else: return date
def get_months_where(self, date_constraint:str) -> list[str]:
"""get a list of all dates in yyyy-mm format
@param date_constraint parameter sqlite constraint
dates = self.get_days_where(date_constraint)
date_dict = {}
for date in dates:
date_without_day = date[0:date.rfind('-')]
date_dict[date_without_day] = 0
return list(date_dict.keys())
def get_days_where(self, date_constraint:str) -> list[str]:
"""get a list of all dates in yyyy-mm-dd format
@param date_constraint parameter sqlite constraint
days = [ date[0] for date in self(f"SELECT DISTINCT DATE(time, 'unixepoch') FROM request WHERE {date_constraint}") ] # fetchall returns tuples (date, )
return days
if __name__ == '__main__':
db = Database("test.db")
@ -1,6 +0,0 @@
"""Gather analytics from nginx access logs and visualize them through generated images and a generated html"""
# __package__ = 'regina'
import regina.utility
from importlib import resources
# ip2nation_db_path = resources.path("regina", "ip2nation.db")
@ -1,666 +0,0 @@
# from sys import path
# print(f"{__file__}: __name__={__name__}, __package__={__package__}, sys.path[0]={path[0]}")
import sqlite3 as sql
from sys import exit
from re import fullmatch
import matplotlib.pyplot as plt
from os.path import isdir
from datetime import datetime as dt
from numpy import empty
# local
from regina.db_operation.database import Database, t_request, t_visitor, t_file, t_filegroup, t_ip_range, t_city, t_country
from regina.utility.sql_util import sanitize, sql_select, sql_exists, sql_insert, sql_tablesize, sql_get_count_where
from regina.utility.utility import pdebug, warning, missing_arg
from regina.utility.globals import settings
visualize information from the databse
palette = {
"red": "#ee4035",
"orange": "#f37736",
"yellow": "#fdf458",
"green": "#7bc043",
"blue": "#0392cf",
"purple": "#b044a0",
color_settings_filetypes = {
palette["red"]: ["html"],
palette["green"]: ["jpg", "png", "jpeg", "gif", "svg", "webp"],
palette["yellow"]: ["css"],
"grey": ["txt"]
color_settings_alternate = list(palette.values())
color_settings_browsers = {
palette["red"]: ["Safari"],
palette["orange"]: ["Firefox"],
palette["yellow"]: ["Chrome"],
"grey": ["Edge"],
palette["green"]: ["Chromium"],
palette["purple"]: ["Brave"]
color_settings_operating_systems = {
palette["red"]: ["Mac"],
palette["green"]: ["Android"],
"grey": ["iPhone", "iPad"],
palette["yellow"]: ["Linux"],
palette["purple"]: ["BSD"],
palette["blue"]: ["Windows"],
def len_list_list(l: list[list]):
size = 0
for i in range(len(l)):
size += len(l[i])
return size
def valid_status(status: int):
if status >= 400: return False
if settings["status_300_is_success"] and status >= 300: return True
return status < 300
def get_os_browser_mobile_rankings(db: Database, visitor_ids: list[int]):
returns [(count, operating_system)], [(count, browser)], mobile_visitor_percentage
os_ranking = {}
os_count = 0.0
browser_ranking = {}
browser_count = 0.0
mobile_ranking = { True: 0.0, False: 0.0 }
for visitor_id in visitor_ids:
os, browser, mobile = db(f"SELECT platform,browser,mobile FROM {t_visitor} WHERE visitor_id = {visitor_id}")[0]
mobile = bool(mobile)
if os:
if os in os_ranking: os_ranking[os] += 1
else: os_ranking[os] = 1
os_count += 1
if browser:
if browser in browser_ranking: browser_ranking[browser] += 1
else: browser_ranking[browser] = 1
browser_count += 1
if (os or browser):
mobile_ranking[mobile] += 1
mobile_visitor_percentage = mobile_ranking[True] / (mobile_ranking[True] + mobile_ranking[False])
except ZeroDivisionError:
mobile_visitor_percentage = 0.0
os_ranking = [(c * 100/os_count, n) for n, c in os_ranking.items()]
browser_ranking = [(c * 100/browser_count, n) for n, c in browser_ranking.items()]
return os_ranking, browser_ranking, mobile_visitor_percentage*100
def get_where_date_str(at_date=None, min_date=None, max_date=None):
# dates in unix time
s = ""
if at_date is not None:
if isinstance(at_date, str):
s += f"DATE(date, 'unixepoch') = '{sanitize(at_date)}' AND "
elif isinstance(at_date, int|float):
s += f"date = {int(at_date)} AND "
print(f"WARNING: get_where_date_str: Invalid type of argument at_date: {type(at_date)}")
if min_date is not None:
if isinstance(min_date, str):
s += f"DATE(date, 'unixepoch') >= '{sanitize(min_date)}' AND "
elif isinstance(min_date, int|float):
s += f"date >= {int(min_date)} AND "
print(f"WARNING: get_where_date_str: Invalid type of argument min_date: {type(min_date)}")
if max_date is not None:
if isinstance(max_date, str):
s += f"DATE(date, 'unixepoch') <= '{sanitize(max_date)}' AND "
elif isinstance(max_date, int|float):
s += f"date <= {int(max_date)} AND "
print(f"WARNING: get_where_date_str: Invalid type of argument max_date: {type(max_date)}")
if s == "":
print(f"WARNING: get_where_date_str: no date_str generated. Returning 'date > 0'. at_date={at_date}, min_date={min_date}, max_date={max_date}")
return "date > 0"
return s.removesuffix(" AND ")
# get the earliest date
def get_earliest_date(db: Database) -> int:
"""return the earliest time as unixepoch"""
date = db(f"SELECT MIN(date) FROM {t_request}")[0][0]
if not isinstance(date, int): return 0
else: return date
# get the latest date
def get_latest_date(db: Database) -> int:
"""return the latest time as unixepoch"""
date = db(f"SELECT MAX(date) FROM {t_request}")[0][0]
if not isinstance(date, int): return 0
else: return date
# get all dates
# the date:str parameter in all these function must be a sqlite constraint
def get_days(db: Database, date:str) -> list[str]:
"""get a list of all dates in yyyy-mm-dd format"""
days = [ date[0] for date in db(f"SELECT DISTINCT DATE(date, 'unixepoch') FROM {t_request} WHERE {date}")] # fetchall returns tuples (date, )
return days
def get_months(db: Database, date:str) -> list[str]:
"""get a list of all dates in yyyy-mm format"""
dates = get_days(db, date)
date_dict = {}
for date in dates:
date_without_day = date[0:date.rfind('-')]
date_dict[date_without_day] = 0
return list(date_dict.keys())
def get_visitor_agent(db: Database, visitor_id: int):
return sql_select(db.cur, t_visitor, [("visitor_id", visitor_id)])[0][2]
def get_unique_visitor_ids_for_date(db: Database, date:str) -> list[int]:
return [ visitor_id[0] for visitor_id in db(f"SELECT DISTINCT visitor_id FROM {t_request} WHERE {date}") ]
def get_human_visitors(db: Database, unique_visitor_ids, unique_visitor_ids_human: list):
check if they have a known platform AND browser
check if at least one request did not result in an error (http status >= 400)
for visitor_id in unique_visitor_ids:
cur.execute(f"SELECT is_human FROM {t_visitor} WHERE visitor_id = {visitor_id}")
# if not visitor
if cur.fetchone()[0] == 0:
# pdebug(f"get_human_visitors: {visitor_id}, is_human is 0")
# pdebug(f"get_human_visitors: {visitor_id}, is_human is non-zero")
# visitor is human
# pdebug("get_human_visitors: (2)", unique_visitor_ids_human)
def get_unique_request_ids_for_date(db: Database, date:str):
cur.execute(f"SELECT DISTINCT request_id FROM {t_request} WHERE {date}")
return [ request_id[0] for request_id in cur.fetchall()]
def get_unique_request_ids_for_date_and_visitor(db: Database, date:str, visitor_id: int, unique_request_ids_human: list):
cur.execute(f"SELECT DISTINCT request_id FROM {t_request} WHERE {date} AND visitor_id = {visitor_id}")
# all unique requests for visitor_id
for request_id in cur.fetchall():
# get number of requests per day
def get_request_count_for_date(db: Database, date:str) -> int:
cur.execute(f"SELECT COUNT(*) FROM {t_request} WHERE {date}")
return cur.fetchone()[0]
def get_unique_visitor_count(db: Database) -> int:
return sql_tablesize(cur, t_visitor)
def get_file_ranking(db: Database, date:str) -> list[tuple[int, str]]:
global settings
:returns [(request_count, groupname)]
ranking = []
cur.execute(f"SELECT group_id, groupname FROM {t_filegroup}")
for group in cur.fetchall():
group_id = group[0]
# filename = sql_select(cur, t_file, [("group_id", group)])
# if len(filename) == 0: continue
# filename = filename[0][0]
filename = group[1]
if settings["file_ranking_regex_whitelist"]: # if file in whitelist
if not fullmatch(settings["file_ranking_regex_whitelist"], filename):
pdebug(f"get_file_ranking: file with group_id {group_id} is not in whitelist")
if settings["file_ranking_ignore_error_files"]: # if request to file was successful
success = False
cur.execute(f"SELECT status FROM {t_request} WHERE group_id = {group_id}")
for status in cur.fetchall():
if valid_status(status[0]):
pdebug(f"get_file_ranking: success code {status[0]} for file with group_id {group_id} and groupname {filename}")
success = True
if not success:
pdebug(f"get_file_ranking: file with group_id {group_id} and groupname {filename} has only requests resulting in error")
# ranking.append((sql_get_count_where(cur, t_request, [("group_id", group)]), filename))
cur.execute(f"SELECT COUNT(*) FROM {t_request} WHERE group_id = {group_id} AND {date}")
ranking.append((cur.fetchone()[0], filename))
# print(ranking)
return ranking
def get_visitor_agent_ranking(db: Database, date:str) -> list[tuple[int, str]]:
:returns [(request_count, visitor_agent)]
ranking = []
cur.execute(f"SELECT DISTINCT visitor_id FROM {t_request} WHERE {date}")
for visitor_id in cur.fetchall():
visitor_id = visitor_id[0]
visitor_agent = sql_select(cur, t_visitor, [("visitor_id", visitor_id)])
if len(visitor_agent) == 0: continue
visitor_agent = visitor_agent[0][2]
if settings["visitor_agent_ranking_regex_whitelist"]:
if not fullmatch(settings["visitor_agent_ranking_regex_whitelist"], visitor_agent):
# ranking.append((sql_get_count_where(cur, t_request, [("group_id", group)]), filename))
cur.execute(f"SELECT COUNT(*) FROM {t_request} WHERE visitor_id = {visitor_id} AND {date}")
ranking.append((cur.fetchone()[0], visitor_agent))
# print(ranking)
return ranking
def get_request_ranking(field_name: str, table: str, whitelist_regex: str, db: Database, date_condition:str) -> list[tuple[int, str]]:
1) get all the distinct entries for field_name after min_date_unix_time
2) call get_name_function with the distinct entry
3) for every entry, get the count in table after min_date_unix_time
3) sort by count in ascending order
:returns [(request_count, name)]
ranking = []
cur.execute(f"SELECT DISTINCT {field_name} FROM {table} WHERE {date_condition}")
for name in cur.fetchall():
name = name[0]
if whitelist_regex:
if not fullmatch(whitelist_regex, name):
# ranking.append((sql_get_count_where(cur, t_request, [("group_id", group)]), filename))
cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {field_name} = '{name}' AND {date_condition}")
ranking.append((cur.fetchone()[0], name))
# print(ranking)
return ranking
# re_uri_protocol = f"(https?)://"
re_uri_protocol = f"(https?://)?"
re_uri_ipv4 = r"(?:(?:(?:\d{1,3}\.?){4})(?::\d+)?)"
# re_uri_ipv6 = ""
re_uri_domain = r"(?:([^/]+\.)*[^/]+\.[a-zA-Z]{2,})"
re_uri_location = r"(?:/(.*))?"
re_uri_full = f"{re_uri_protocol}({re_uri_domain}|{re_uri_ipv4})({re_uri_location})"
# (https?://)?((?:([^/]+\.)*[^/]+\.[a-zA-Z]{2,})|(?:(?:(?:\d{1,3}\.?){4})(?::\d+)?))((?:/(.*))?)
def cleanup_referer(referer: str) -> str:
split the referer uri into its parts and reassemeble them depending on settings
m = fullmatch(re_uri_full, referer)
if not m:
warning(f"cleanup_referer: Could not match referer '{referer}'")
return referer
# pdebug(f"cleanup_referer: {referer} - {m.groups()}")
protocol = m.groups()[0]
subdomains = m.groups()[2]
if not subdomains: subdomains = ""
domain = m.groups()[1].replace(subdomains, "")
location = m.groups()[3]
referer = domain
if settings["referer_ranking_ignore_tld"]:
if len(domain.split(".")) == 2: # if domain.tld
referer = domain.split(".")[0]
if not settings["referer_ranking_ignore_subdomain"]: referer = subdomains + referer
if not settings["referer_ranking_ignore_protocol"]: referer = protocol + referer
if not settings["referer_ranking_ignore_location"]: referer += location
# pdebug(f"cleanup_referer: cleaned up: {referer}")
return referer
def cleanup_referer_ranking(referer_ranking: list[tuple[int, str]]):
unique_referers = dict()
for count, referer in referer_ranking:
referer = cleanup_referer(referer)
if referer in unique_referers:
unique_referers[referer] += count
unique_referers[referer] = count
for referer, count in unique_referers.items():
referer_ranking.append((count, referer))
def get_city_and_country_ranking(cur:sql.Cursor, require_humans=True, regex_city_blacklist="", regex_country_blacklist=""):
sql_cmd = f"SELECT, c.code, FROM {t_country} AS c, {t_city} as ci, {t_visitor} as u, {t_ip_range} as i WHERE u.ip_range_id = i.ip_range_id AND i.city_id = ci.city_id AND ci.country_id = c.country_id"
if require_humans: sql_cmd += " AND u.is_human = 1"
pdebug(f"get_city_and_country_ranking: require_humans={require_humans}, regex_city_blacklist='{regex_city_blacklist}', regex_country_blacklist='{regex_country_blacklist}'")
cities = cur.fetchall()
cities_dict = {}
country_dict = {}
pdebug(f"get_city_and_country_ranking: found {len(cities)} ip_ranges")
validate_city_cmd = lambda _ : True
validate_country_cmd = lambda _ : True
if len(regex_city_blacklist) > 0: validate_city_cmd = lambda city : fullmatch(regex_city_blacklist, city) is None
if len(regex_country_blacklist) > 0 : validate_country_cmd = lambda country : fullmatch(regex_country_blacklist, country) is None
for i in range(len(cities)):
if cities[i][0] in cities_dict:
cities_dict[cities[i][0]][0] += 1
if validate_city_cmd(cities[i][0]):
cities_dict[cities[i][0]] = [1, cities[i][1], cities[i][2]] # count, country code
if cities[i][2] in country_dict:
country_dict[cities[i][2]] += 1
if validate_country_cmd(cities[i][2]):
country_dict[cities[i][2]] = 1 # count, country code
city_ranking = [(v[0], f"{k} ({v[1]})") for k,v in cities_dict.items()]
country_ranking = [(v, k) for k,v in country_dict.items()]
return city_ranking, country_ranking
# add value labels
def add_vertikal_labels_in_bar_plot(labels, max_y_val, ax, bar_plot):
# pdebug("add_vertikal_labels_in_bar_plot:", labels)
for idx,rect in enumerate(bar_plot):
height = rect.get_height()
if height > 0.6 * max_y_val: # if the bar is large, put label in the bar
height = 0.05 * max_y_val
ax.text(rect.get_x() + rect.get_width()/2., height + 0.025 * max_y_val,
ha='center', va='bottom', rotation=90)
# add count labels
def add_labels_at_top_of_bar(xdata, ydata, max_y_val, ax, bar_plot):
# pdebug("add_labels_at_top_of_bar:", xdata, ydata)
y_offset = 0.05 * max_y_val
for idx,rect in enumerate(bar_plot):
ax.text(rect.get_x() + rect.get_width()/2, ydata[idx] - y_offset, round(ydata[idx], 1), ha='center', bbox=dict(facecolor='white', alpha=0.8))
def plot_ranking(ranking: list[tuple[int, str]], fig=None, xlabel="", ylabel="", color_settings:dict|list=[], figsize=None):
make a bar plot of the most requested files
# pdebug(f"plot_ranking: ranking={ranking}")
if not fig:
fig = plt.figure(figsize=figsize, dpi=settings["plot_dpi"], linewidth=1.0, frameon=True, subplotpars=None, layout=None)
# create new axis if none is given
ax = fig.add_subplot(xlabel=xlabel, ylabel=ylabel)
# fill x y data
if len(ranking) > settings["file_ranking_plot_max_files"]:
start_index = len(ranking) - settings["file_ranking_plot_max_files"]
else: start_index = 0
x_names = []
y_counts = []
colors = []
for i in range(start_index, len(ranking)):
ft = ranking[i][1].split(".")[-1]
color = palette["blue"]
# if not color_settings: color = palette["blue"]
if isinstance(color_settings, dict):
for key, val in color_settings.items():
if ft in val: color = key
if not color: color = palette["blue"]
elif isinstance(color_settings, list):
# print(color_settings, (i - start_index) % len(color_settings))
color = color_settings[(i - start_index) % len(color_settings)]
bar =, y_counts, tick_label="", color=colors)
if len(y_counts) > 0:
add_vertikal_labels_in_bar_plot(x_names, y_counts[-1], ax, bar)
if settings["plot_add_count_label"]: add_labels_at_top_of_bar(x_names, y_counts, y_counts[-1], ax, bar)
# ax.ylabel(y_counts)
return fig
# def plot(xdata, ydata, fig=None, ax=None, xlabel="", ylabel="", label="", linestyle='-', marker="", color="blue", rotate_xlabel=0):
# if not fig:
# fig = plt.figure(figsize=None, dpi=settings["plot_dpi"], linewidth=1.0, frameon=True, subplotpars=None, layout=None)
# if not ax:
# ax = fig.add_subplot(xlabel=xlabel, ylabel=ylabel)
# else:
# ax = ax.twinx()
# ax.set_ylabel(ylabel)
# # ax.tick_params(axis="y", labelcolor="r")
# ax.plot(xdata, ydata, marker=marker, label=label, linestyle=linestyle, color=color)
# plt.xticks(rotation=rotate_xlabel)
# if label: ax.legend()
# return fig, ax
def plot2y(xdata, ydata1, ydata2, fig=None, ax1=None, ax2=None, plots=None, xlabel="", ylabel1="", ylabel2="", label1="", label2="", linestyle='-', marker="", color1="blue", color2="orange", grid="major", rotate_xlabel=0, figsize=None):
if not fig:
fig = plt.figure(figsize=figsize, dpi=settings["plot_dpi"], linewidth=1.0, frameon=True, subplotpars=None, layout=None)
if not (ax1 and ax2):
ax1 = fig.add_subplot(xlabel=xlabel, ylabel=ylabel1)
ax2 = ax1.twinx()
ax1.tick_params(axis="x", rotation=90)
plot1 = ax1.plot(xdata, ydata1, marker=marker, label=label1, linestyle=linestyle, color=color1)
plot2 = ax2.plot(xdata, ydata2, marker=marker, label=label2, linestyle=linestyle, color=color2)
# ax1.set_xticks(ax1.get_xticks())
# ax1.set_xticklabels(xdata, rotation=rotate_xlabel, rotation_mode="anchor")
# if label1 or label2: ax1.legend()
if plots: plots += plot1 + plot2
else: plots = plot1 + plot2
plt.legend(plots, [ l.get_label() for l in plots])
if grid == "major" or grid == "minor" or grid == "both":
if grid == "minor" or "both":
ax1.grid(visible=True, which=grid, linestyle="-", color="#888")
return fig, ax1, ax2, plots
def visualize(loaded_settings: dict):
global settings
settings = loaded_settings
if not settings["db"]: missing_arg("db")
if not settings["server_name"]: missing_arg("server_name")
img_dir = settings["img_dir"]
pdebug("img_dir:", img_dir)
img_filetype = settings["img_filetype"]
img_location = settings["img_location"]
names = {
# paths
"img_file_ranking_last_x_days": f"ranking_files_last_x_days.{img_filetype}",
"img_referer_ranking_last_x_days": f"ranking_referers_last_x_days.{img_filetype}",
"img_countries_last_x_days": f"ranking_countries_last_x_days.{img_filetype}",
"img_cities_last_x_days": f"ranking_cities_last_x_days.{img_filetype}",
"img_browser_ranking_last_x_days": f"ranking_browsers_last_x_days.{img_filetype}",
"img_operating_system_ranking_last_x_days": f"ranking_operating_systems_last_x_days.{img_filetype}",
"img_visitors_and_requests_last_x_days": f"visitor_request_count_daily_last_x_days.{img_filetype}",
"img_file_ranking_total": f"ranking_files_total.{img_filetype}",
"img_referer_ranking_total": f"ranking_referers_total.{img_filetype}",
"img_countries_total": f"ranking_countries_total.{img_filetype}",
"img_cities_total": f"ranking_cities_total.{img_filetype}",
"img_browser_ranking_total": f"ranking_browsers_total.{img_filetype}",
"img_operating_system_ranking_total": f"ranking_operating_systems_total.{img_filetype}",
"img_visitors_and_requests_total": f"visitor_request_count_daily_total.{img_filetype}",
# values
"mobile_visitor_percentage_total": 0.0,
"mobile_visitor_percentage_last_x_days": 0.0,
"visitor_count_last_x_days": 0,
"visitor_count_total": 0,
"request_count_last_x_days": 0,
"request_count_total": 0,
"human_visitor_percentage_last_x_days": 0.0,
"human_visitor_percentage_total": 0.0,
"human_request_percentage_last_x_days": 0.0,
"human_request_percentage_total": 0.0,
# general
"regina_version": settings["version"],
"server_name": settings["server_name"],
"last_x_days": settings["last_x_days"], # must be after all the things with last_x_days!
"earliest_date": "1990-1-1",
"generation_date": "1990-1-1 0:0:0",
conn = sql.connect(settings["db"])
if isdir(img_dir) and img_filetype:
gen_img = True
print(f"Warning: Not generating images since at least one required variable is invalid: img_dir='{img_dir}', img_filetype='{img_filetype}'")
gen_img = False
cur = conn.cursor()
get_humans = settings["get_human_percentage"]
# pdebug(f"visualize: settings {settings}")
earliest_date = get_earliest_date(cur)
names["earliest_date"] = dt.fromtimestamp(earliest_date).strftime("%Y-%m-%d")
names["generation_date"] ="%Y-%m-%d %H:%M:%S")
# last_x_days_min_date: latest_date - last_x_days
secs_per_day = 86400
last_x_days_min_date = get_latest_date(cur) - settings["last_x_days"] * secs_per_day
last_x_days_str = get_where_date_str(min_date=last_x_days_min_date)
days = get_days(cur, last_x_days_str)
days_strs = [get_where_date_str(at_date=day) for day in days]
all_time_str = get_where_date_str(min_date=0)
# all months in yyyy-mm format
months_all_time = get_months(cur, all_time_str)
# sqlite constrict to month string
months_strs = []
for year_month in months_all_time:
year, month = year_month.split("-")
# first day of the month
min_date = dt(int(year), int(month), 1).timestamp()
month = (int(month) % 12) + 1 # + 1 month
year = int(year)
if month == 1: year += 1
# first day of the next month - 1 sec
max_date = dt(year, month, 1).timestamp() - 1
months_strs.append(get_where_date_str(min_date=min_date, max_date=max_date))
for i in range(2):
suffix = ["_total", "_last_x_days"][i]
date_str = [all_time_str, last_x_days_str][i]
date_names = [months_all_time, days][i]
date_strs = [months_strs, days_strs][i]
assert(len(date_names) == len(date_strs))
file_ranking = get_file_ranking(cur, date_str)
if gen_img:
fig_file_ranking = plot_ranking(file_ranking, xlabel="Filename/Filegroup", ylabel="Number of requests", color_settings=color_settings_filetypes, figsize=settings["plot_size_broad"])
fig_file_ranking.savefig(f"{img_dir}/{names[f'img_file_ranking{suffix}']}", bbox_inches="tight")
referer_ranking = get_request_ranking("referer", t_request, settings["referer_ranking_regex_whitelist"], cur, date_str)
pdebug("Referer ranking", referer_ranking)
if gen_img:
fig_referer_ranking = plot_ranking(referer_ranking, xlabel="HTTP Referer", ylabel="Number of requests", color_settings=color_settings_alternate, figsize=settings["plot_size_broad"])
fig_referer_ranking.savefig(f"{img_dir}/{names[f'img_referer_ranking{suffix}']}", bbox_inches="tight")
if settings["do_geoip_rankings"]:
city_ranking, country_ranking = get_city_and_country_ranking(cur, require_humans=settings["geoip_only_humans"], regex_city_blacklist=settings["city_ranking_regex_blacklist"], regex_country_blacklist=settings["country_ranking_regex_blacklist"])
pdebug("Country ranking:", country_ranking)
pdebug("City ranking:", city_ranking)
if gen_img:
fig_referer_ranking = plot_ranking(country_ranking, xlabel="Country", ylabel="Number of visitors", color_settings=color_settings_alternate, figsize=settings["plot_size_broad"])
fig_referer_ranking.savefig(f"{img_dir}/{names[f'img_countries{suffix}']}", bbox_inches="tight")
fig_referer_ranking = plot_ranking(city_ranking, xlabel="City", ylabel="Number of visitors", color_settings=color_settings_alternate, figsize=settings["plot_size_broad"])
fig_referer_ranking.savefig(f"{img_dir}/{names[f'img_cities{suffix}']}", bbox_inches="tight")
# visitor_agent_ranking = get_visitor_agent_ranking(cur, date_str)
# for the time span
unique_visitor_ids = get_unique_visitor_ids_for_date(cur, date_str)
unique_visitor_ids_human = []
get_human_visitors(cur, unique_visitor_ids, unique_visitor_ids_human)
# for each date
date_count = len(date_strs)
unique_visitor_ids_dates: list[list[int]] = []
unique_request_ids_dates: list[list[int]] = []
unique_visitor_ids_human_dates: list[list[int]] = [[] for _ in range(date_count)]
unique_request_ids_human_dates: list[list[int]] = [[] for _ in range(date_count)]
for i in range(date_count):
date_str_ = date_strs[i]
unique_visitor_ids_dates.append(get_unique_visitor_ids_for_date(cur, date_str_))
unique_request_ids_dates.append(get_unique_request_ids_for_date(cur, date_str_))
if get_humans:
# empty_list = []
# unique_visitor_ids_human_dates.append(empty_list)
get_human_visitors(cur, unique_visitor_ids_dates[i], unique_visitor_ids_human_dates[i])
# unique_request_ids_human_dates.append(list())
for human in unique_visitor_ids_human_dates[i]:
get_unique_request_ids_for_date_and_visitor(cur, date_str_, human, unique_request_ids_human_dates[i])
# print("\n\tuu", unique_visitor_ids_dates, "\n\tur",unique_request_ids_dates, "\n\tuuh", unique_visitor_ids_human_dates, "\n\turh", unique_request_ids_human_dates)
# pdebug("uui", unique_visitor_ids)
# pdebug("uuih", unique_visitor_ids_human)
# pdebug("uuid", unique_visitor_ids_dates)
# pdebug("uuidh", unique_visitor_ids_human_dates)
# pdebug("urid", unique_request_ids_dates)
# pdebug("uridh", unique_visitor_ids_human_dates)
# pdebug(f"human_visitor_precentage: len_list_list(visitor_ids)={len_list_list(unique_visitor_ids_dates)}, len_list_list(visitor_ids_human)={len_list_list(unique_visitor_ids_human_dates)}")
if get_humans:
names[f"human_visitor_percentage{suffix}"] = round(100 * len_list_list(unique_visitor_ids_human_dates) / len_list_list(unique_visitor_ids_dates), 2)
names[f"human_visitor_percentage{suffix}"] = -1.0
names[f"human_request_percentage{suffix}"] = round(100 * len_list_list(unique_request_ids_human_dates) / len_list_list(unique_request_ids_dates), 2)
names[f"human_request_percentage{suffix}"] = -1.0
names[f"visitor_count{suffix}"] = len_list_list(unique_visitor_ids_dates)
names[f"request_count{suffix}"] = len_list_list(unique_request_ids_dates)
if gen_img:
fig_daily, ax1, ax2, plots = plot2y(date_names, [len(visitor_ids) for visitor_ids in unique_visitor_ids_dates], [len(request_ids) for request_ids in unique_request_ids_dates], xlabel="Date", ylabel1="Visitor count", label1="Unique visitors", ylabel2="Request count", label2="Unique requests", color1=palette["red"], color2=palette["blue"], rotate_xlabel=-45, figsize=settings["plot_size_broad"])
if get_humans:
fig_daily, ax1, ax2, plots = plot2y(date_names, [len(visitor_ids) for visitor_ids in unique_visitor_ids_human_dates], [len(request_ids) for request_ids in unique_request_ids_human_dates], label1="Unique visitors (human)", label2="Unique requests (human)", color1=palette["orange"], color2=palette["green"], fig=fig_daily, ax1=ax1, ax2=ax2, plots=plots, rotate_xlabel=-45, figsize=settings["plot_size_broad"])
fig_daily.savefig(f"{img_dir}/{names[f'img_visitors_and_requests{suffix}']}", bbox_inches="tight")
# os & browser
os_ranking, browser_ranking, names[f"mobile_visitor_percentage{suffix}"] = get_os_browser_mobile_rankings(cur, unique_visitor_ids_human)
if gen_img:
fig_os_rating = plot_ranking(os_ranking, xlabel="Platform", ylabel="Share [%]", color_settings=color_settings_operating_systems, figsize=settings["plot_size_narrow"])
fig_os_rating.savefig(f"{img_dir}/{names[f'img_operating_system_ranking{suffix}']}", bbox_inches="tight")
fig_browser_rating = plot_ranking(browser_ranking, xlabel="Browsers", ylabel="Share [%]", color_settings=color_settings_browsers, figsize=settings["plot_size_narrow"])
fig_browser_rating.savefig(f"{img_dir}/{names[f'img_browser_ranking{suffix}']}", bbox_inches="tight")
# print("OS ranking", os_ranking)
# print("Browser ranking", browser_ranking)
# print("Mobile percentage", names["mobile_visitor_percentage"])
if settings["template_html"] and settings["html_out_path"]:
pdebug(f"visualize: writing to html: {settings['html_out_path']}")
with open(settings["template_html"], "r") as file:
html =
for name, value in names.items():
if "img" in name:
value = f"{img_location}/{value}"
html = html.replace(f"%{name}", str(value))
with open(settings["html_out_path"], "w") as file:
warning(f"Skipping html generation because either template_html or html_out_path is invalid: template_html='{settings['template_html']}', html_out_path='{settings['html_out_path']}'")
Normal file
Normal file
@ -0,0 +1,155 @@
# ************************************* REGINA CONFIGURATION **************************************
# .__
# _______ ____ ____ |__| ____ _____
# \_ __ \_/ __ \ / ___\| |/ \\__ \
# | | \/\ ___// /_/ > | | \/ __ \_
# |__| \___ >___ /|__|___| (____ /
# \/_____/ \/ \/
# *************************************************************************************************
[ regina ]
# name of the server or website
# will be available as variable for the the generated website as %server_name
# string
server_name =
# database path. if not specified, use xdg-data-home/regina/<server-name>
# eg: /home/my_user/regina/my_website.db
# path or empty
database =
[ data-collection ]
# path to the nginx access log to parse
# eg: /var/log/nginx/access.log
# path (read permissions)
access_log =
# nginx locations and their root directory: location:directory,location:directory,...
# eg: /:/www/my_website,/error:/www/error
locs_and_dirs =
# filetypes that should be grouped (comma separated)
# eg: png,jpg,jpeg,gif,svg,css,ico,pdf,txt
auto_group_filetypes =
# group certain files
# eg: home:index.html,home.html;images:image1.png,image2.png
[ data-visualization ]
# template html input
# eg: /home/my_visitor/.regina/template.html
# path (read permissions)
template_html =
# output for the generated html
# eg: /www/analytics/statistics.html
# path (write permissions)
html_out_path =
# output directory for the generated plots
# WARNING: you have to create the directory yourself, regina will not create it
# eg: /www/analytics/images
# path (directory with write permissions)
img_out_dir =
# nginx location for the generated images, its root must be img_out_dir
# eg: images
img_location =
# if the root for your server is /www/analytics and html_out_path is /www/analytics/analytics.html,
# use img_dir = /www/analytics/images and img_location = /images
[ route_groups ]
images =
# wether a request with 30x http status counts as success
status_300_is_success = False
# if False, unique visitor is (ip-address - visitor agent) pair, if True only ip addess
unique_visitor_is_ip_address = False
# wether a visitor needs to make at least 1 successful request to be a human
human_needs_success = True
# dont collect requests to locations fully match this
# eg: /analytics.*
request_location_regex_blacklist =
[ geoip ]
get_visitor_location = False
# this option is relevant used when --update-geoip is used
# list if capitalized ISO 3166-1 alpha-2 country codes for which the location needs to be resolved at city level, not country level
# for EU, use: get_cities_for_countries = AT, BE, BG, HR, CY, CZ, DK, EE, FI, FR, DE, GZ, HU, IE, IT, LV, LT, LU, MT, NL, PL, PT, RO, SK, SI, ES, SE
get_cities_for_countries =
# hash_ip_address = False
# ***************************************** VISUALIZATION *****************************************
# these changes can be changed at any point in time as they only affect the visualization of the data
# *************************************************************************************************
[ visualization ]
# separate visitors into all and humans
# True/False
get_human_percentage = True
# generate a country and city ranking
# True/False
do_geoip_rankings = False
# only use humans for geoip rankings
# True/False
geoip_only_humans = True
# eg exclude unknown cities: City in .*
# regex
city_ranking_regex_blacklist = City in .*
# True/False
country_ranking_regex_blacklist =
# ignore the protocol in referers, so = ->
referer_ranking_ignore_protocol = True
# ignore the subdomains in referers, so = ->
referer_ranking_ignore_subdomain = False
# ignore the location in referers, so = ->
referer_ranking_ignore_location = True
# regex expression as whitelist for referer ranking, minus means empty
# eg exclude empty referers: ^[^\-].*
referer_ranking_regex_whitelist = ^[^\-].*
# regex expression as whitelist for file ranking
# eg .*\.((txt)|(html)|(css)|(php)|(png)|(jpeg)|(jpg)|(svg)|(gif)) to only show these files
# regex
route_ranking_regex_whitelist =
# maximum number of route (group)s on the file ranking
# int
route_ranking_plot_max_routes = 20
# wether to ignore non existing files in the ranking
# True/False
route_ranking_ignore_error_files = True
# int
plot_dpi = 300
# affects visitor/request count plot, geoip rankings, file ranking and referer ranking
plot_size_broad = 14, 5
# affects platform and browser ranking
plot_size_narrow = 7, 5
# ******************************************** REGINA *********************************************
# these settings affect the behavior of regina
# *************************************************************************************************
# print lots! of debug messages to help you find problems
debug = False
Normal file
Normal file
@ -0,0 +1,166 @@
# ************************************* REGINA CONFIGURATION **************************************
# .__
# _______ ____ ____ |__| ____ _____
# \_ __ \_/ __ \ / ___\| |/ \\__ \
# | | \/\ ___// /_/ > | | \/ __ \_
# |__| \___ >___ /|__|___| (____ /
# \/_____/ \/ \/
# *************************************************************************************************
# Common Settings
[ regina ]
# name (not url) of the server or website
# will be avaiable as variable for the generated html as %server_name
# type: string
# server_name = my_website
server_name =
# database path
# type: file (read, write permissions)
# database = /home/my_user/regina/my_website.db
database =
# path to the nginx access log to parse
# type: file (read permissions)
# access_log = /var/log/nginx/access.log
access_log =
# The template and generated file do actually have to be htmls, you can change it to whatever you want
[ html-generation ]
# type: True/False
generate_html = True
# template html input
# type: file (read permissions)
# template_html = /home/my_visitor/.regina/template.html
template_html =
# output for the generated html
# type: file (write permissions)
# html_out_path = /www/analytics/statistics.html
html_out_path =
# output directory for the generated plots
# type: directory (write permissions)
# img_out_dir = /www/analytics/images
img_out_dir =
# nginx location for the generated images (this has to map to img_out_dir)
# type: eg: images
# img_location = /images
img_location =
# These settings affect the data collection. If changed, they will affect how the database is being filled in the future.
[ data-collection ]
# whether a unique visitor is only identified by IP address
# type: True/False
unique_visitor_is_ip_address =
# whether a visitor needs at least one successful request to be a human
# type: True/False
human_needs_success = True
# whether a request with 30x HTTP status counts as successful request
# type: True/False
status_300_is_success = True
# delete all ip addresses after the collection is done
# type: True/False
delete_ip_addresses = True
# don't collect requests to locations that match this regex
# type: regexp, None, int or string
# request_location_blacklist = /analytics.*
request_location_blacklist =
# whether to get visitor location information
# type: True/False
get_visitor_location =
# whether to generate country and city rankings using GeoIP (requires GeoIP Database)
# type: True/False
do_geoip_rankings =
# countries for which the GeoIP needs to be resolved at city level
# type: list of capitalized ISO 3166-1 alpha-2 country codes
# get_cities_for_countries = AT, BE, BG, HR, CY, CZ, DK, EE, FI, FR, DE, GZ, HU, IE, IT, LV, LT, LU, MT, NL, PL, PT, RO, SK, SI, ES, SE
get_cities_for_countries =
# whether to use only humans for GeoIP rankings (requires GeoIP Database)
# type: True/False
geoip_only_humans = True
[ rankings ]
# Explanation for blacklists and whitelists:
# If a blacklist is given: values that fully match the blacklist are excluded
# If a whitelist is given: values that do not fully match the whitelist are excluded
# Both are optional: you can provide, none or both
# type: regexp or None
# city_ranking_blacklist = City in .*
city_ranking_blacklist =
# type: regexp or None
city_ranking_whitelist =
# type: regexp or None
country_ranking_blacklist =
# type: regexp or None
country_ranking_whitelist =
# type: regexp or None
# route_ranking_blacklist = .*\.((css)|(txt))
route_ranking_blacklist =
# type: regexp or None
# route_ranking_whitelist = .*\.((php)|(html)|(php)|(png)|(jpeg)|(jpg)|(svg)|(gif))
route_ranking_whitelist =
# maximum number of entries in route ranking
# type: int
route_ranking_plot_max_routes = 20
# whether to ignore non-existing routes in ranking
# type: True/False
route_ranking_ignore_404 = True
# type: regexp or None
# referer_ranking_blacklist = Example: exclude '-' (nginx sets this when there is no referer)
referer_ranking_blacklist = -
# type: regexp or None
referer_ranking_whitelist =
# whether to ignore protocol in referer ranking (if True: == ->
# type: True/False
referer_ranking_ignore_protocol = True
# whether to ignore subdomains inreferer ranking (if True: == ->
# type: True/False
referer_ranking_ignore_subdomain =
# whether to ignore route in referer ranking (if True: == ->
# type: True/False
referer_ranking_ignore_route = True
[ plots ]
# DPI for plots
# type: int
plot_dpi = 300
# plot size for broad plots: width, heigh
# type: int, int
plot_size_broad = 14, 5
# plot size for narrow plots: width, height
# type: int, int
plot_size_narrow = 7, 5
# *************************************************************************************************
# *************************************************************************************************
@ -5,18 +5,19 @@ from sys import argv, exit
from os.path import isfile
import sqlite3 as sql
if __name__ == "__main__":
import argparse
if __name__ == "__main__": # make relative imports work as described here:
if __package__ is None:
# make relative imports work as described here:
__package__ = "regina"
import sys
from os import path
filepath = path.realpath(path.abspath(__file__))
sys.path.insert(0, path.dirname(path.dirname(filepath)))
from .db_operation.collect import parse_log, add_requests_to_db, update_ip_range_id
from .db_operation.database import create_db, update_geoip_tables, t_visitor
from .db_operation.visualize import visualize
from .data_collection.parse_log import parse_log
from .database import Database
from .data_visualization import visualize
from .utility.settings_manager import read_settings_file
from .utility.globals import settings, version
from .utility.utility import pmessage
@ -74,81 +75,56 @@ def error(arg):
print("Error:", arg)
def main():
config_file = ""
collect = False
visualize_ = False
log_file = ""
geoip_city_csv = ""
# parse args
i = 1
while i in range(1, len(argv)):
if argv[i] in ["--config", "-c"]:
if len(argv) > i + 1: config_file = argv[i+1]
else: missing_arg_val(argv[i])
elif argv[i] == "--log-file":
if len(argv) > i + 1: log_file = argv[i+1]
else: missing_arg_val(argv[i])
if argv[i] == "--update-geoip":
if len(argv) > i + 1: geoip_city_csv = argv[i+1]
else: missing_arg_val(argv[i])
elif argv[i] in ["--help", "-h"]:
elif argv[i] == "--collect":
collect = True
elif argv[i] == "--visualize":
visualize_ = True
i += 1
if not (collect or visualize_ or geoip_city_csv):
missing_arg("--visualize or --collect or --update-geoip")
if not config_file:
if not isfile(config_file):
error(f"Not a file: '{config_file}'")
read_settings_file(config_file, settings)
def main2():
parser = argparse.ArgumentParser(prog="regina")
parser.add_argument("--config", "-c", action="store", help="path to a config file that specifies all the other parameters", metavar="config-file", required=True)
parser.add_argument("--update-geoip", action="store", help="path to IP-COUNTRY-REGION-CITY database in csv format", metavar="geoip-csv")
parser.add_argument("--visualize", action="store_true", help="generate the visualization website")
parser.add_argument("--collect", action="store_true", help="fill the database from the nginx access log")
parser.add_argument("--log-file", action="store", help="use alternate logfile than what is set in the config file", metavar="log-file")
args = parser.parse_args()
if not (args.collect or args.visualize or args.update_geoip):
parser.error("at least one of --visualize, --collect, or --update-geoip is required.")
if not path.isfile(args.config):
parser.error(f"invalid path to configuration file: '{args.config}'")
read_settings_file(args.config, settings)
settings["version"] = version
if log_file: settings["access_log"] = log_file
if args.log_file:
settings["access_log"] = args.log_file
if not settings["server_name"]: missing_arg("server-name")
if not settings["access_log"]: missing_arg("log")
if not settings["db"]: missing_arg("db")
if isinstance(settings["auto_group_filetypes"], str):
settings["auto_group_filetypes"] = settings["auto_group_filetypes"].split(",")
if isinstance(settings["locs_and_dirs"], str):
settings["locs_and_dirs"] = [ loc_and_dir.split(":") for loc_and_dir in settings["locs_and_dirs"].split(",") ]
if not settings["server_name"]:
error("'server-name' is missing in the configuration file.")
if not isfile(config_file):
error(f"Not a file: '{config_file}'")
if not settings["access_log"]:
error("'log' is missing in the configuration file.")
if not settings["db"]:
error("'db' is missing in the configuration file.")
if not isfile(settings["db"]):
create_db(settings["db"], settings["filegroups"], settings["locs_and_dirs"], settings["auto_group_filetypes"])
db = Database(settings["db"])
# if not isfile(settings["db"]):
# create_db(settings["db"], settings["filegroups"], settings["locs_and_dirs"], settings["auto_group_filetypes"])
if geoip_city_csv:
if not isfile(geoip_city_csv):
error(f"Not a file: '{geoip_city_csv}'")
conn = sql.connect(settings['db'], isolation_level=None) # required vor vacuum
cur = conn.cursor()
update_geoip_tables(cur, geoip_city_csv)
if args.update_geoip:
if not isfile(args.update_geoip):
error(f"Not a file: '{args.update_geoip}'")
# update visitors
for visitor_id in range(sql_tablesize(cur, t_visitor)):
update_ip_range_id(cur, visitor_id)
if collect:
for (visitor_id) in db(f"SELECT visitor_id FROM visitor"):
if args.collect:
pmessage(f"regina version {version} with server-name '{settings['server_name']}', database '{settings['db']}' and logfile '{settings['access_log']}'")
requests = parse_log(settings["access_log"])
add_requests_to_db(requests, settings["db"])
if visualize_:
if args.visualize:
pmessage(f"regina version {version} with server-name '{settings['server_name']}', database '{settings['db']}'")
if not isfile(settings["db"]): error(f"Invalid database path: '{settings['db']}'")
if __name__ == '__main__':
@ -67,5 +67,5 @@ CREATE TABLE IF NOT EXISTS city(
Normal file
Normal file
Binary file not shown.
Normal file
Normal file
@ -0,0 +1,34 @@
def get_files_from_dir_rec(p: str, files: list[str]):
"""recursivly append all files to files"""
if path.isfile(p):
elif path.isdir(p):
for p_ in listdir(p):
get_files_from_dir_rec(p + "/" + p_, files)
def create_filegroups(cursor: sql.Cursor, filegroup_str: str):
TODO: make re-usable (alter groups when config changes)
# filegroup_str: 'name1: file1, file2, file3; name2: file33'
groups = filegroup_str.strip(";").split(";")
pdebug("create_filegroups:", groups)
for group in groups:
name, vals = group.split(":")
# create/get group
if sql_exists(cursor, "", [("groupname", name)]):
group_id = sql_select(cursor, "", [("groupname", name)])[0][0]
group_id = sql_max(cursor, "", "group_id") + 1
sql_insert(cursor, "", [(group_id, name)])
# pdebug("create_filegroups: group_id", group_id)
# create/edit file
for filename in vals.split(","):
if sql_exists(cursor, "", [("filename", filename)]): # if exist, update
cursor.execute(f"UPDATE file SET group_id = {group_id} WHERE filename = 'fil'")
sql_insert(cursor, "", [[filename, group_id]])
@ -2,57 +2,9 @@
import os
version = "1.0"
version = "2.0"
# default settings, these are overwriteable through a config file
settings = {
"server_name": "default_sever",
"access_log": "",
"db": "",
"locs_and_dirs": [],
"auto_group_filetypes": [],
"filegroups": "",
"request_location_regex_blacklist": "",
"request_is_same_on_same_day": True, # mutiple requests from same visitor to same file at same day are counted as 1
"unique_visitor_is_ip_address": False,
"get_visitor_location": False,
"get_cities_for_countries": [""], # list if country codes for which the ip address ranges need to be collected at city level, not country level
"hash_ip_address": True,
"get_human_percentage": False,
"human_needs_success": True, # a human must have at least 1 successful request (status < 300)
"status_300_is_success": False, # 300 codes are success
"do_geoip_rankings": False,
"geoip_only_humans": True,
"city_ranking_regex_blacklist": "",
"country_ranking_regex_blacklist": "",
# "file_ranking_regex_whitelist": r".*\.((txt)|(html)|(css)|(php)|(png)|(jpeg)|(jpg)|(svg)|(gif))",
"file_ranking_regex_whitelist": r".*\.(html)",
"file_ranking_ignore_error_files": False, # skip files that only had unsuccessful requests (status < 300)
"referer_ranking_ignore_protocol": True,
"referer_ranking_ignore_subdomain": False,
"referer_ranking_ignore_location": True,
"referer_ranking_ignore_tld": False,
"referer_ranking_regex_whitelist": r"^[^\-].*", # minus means empty
"visitor_agent_ranking_regex_whitelist": r"",
"file_ranking_plot_max_files": 15,
# "plot_figsize": (60, 40),
"plot_dpi": 300,
"plot_add_count_label": True,
"plot_size_broad": (10, 5),
"plot_size_narrow": (6.5, 5),
"img_dir": "",
"img_location": "",
"img_filetype": "svg",
"template_html": "",
"html_out_path": "",
"last_x_days": 30,
# regina
"debug": False
# these oses and browser can be detected:
# lower element takes precedence
@ -1,3 +1,298 @@
from configparser import ConfigParser
Classes and methods for managing regina configuration
Using CFG_File and CFG_Entry, you set defaults and type restrictions for
a dictionary like ReginaSettings object and also export the defaults as a .cfg file
def comment(s):
return "# " + s.replace("\n", "\n# ").strip("# ")
# for eventual later type checking
class regexp:
represents a regular expression
class Path:
represents a path
def __init__(self, permissions="r", is_dir=False):
self.is_dir = is_dir
self.permissions = permissions
def __repr__(self):
if self.is_dir:
s = "directory"
s = "file"
if self.permissions:
s += " ("
if "r" in self.permissions: s += "read, "
if "w" in self.permissions: s += "write, "
if "x" in self.permissions: s += "execute, "
s = s[:-2] + " permissions)"
return s
class CFG_Entry:
key - value pair in a cfg file
extra parameters for comments on top of the key - value pair
types = str|Path|None|type[regexp]|type[str]|type[bool]|type[int]
def __init__(self, key, dflt=None, typ_: types|list[types]|tuple[types] =str, desc="", exam=""): # all 4 letters -> nice indent
@param typ: type for the value:
use list of types if multiple types are allowed
use tuple of types for tuple of types
self.key = key
self.default = dflt
self.type_ = typ_
self.descripton= desc
self.example = exam
def type_str(self):
def _type_str(t):
if type(t) == str: return t
if t is None: return "None"
if t == str: return "string"
if t == bool: return "True/False"
if t == int: return "int"
if t == float: return "float"
if t == regexp: return "regexp"
if type(t) == Path: return str(t)
return t.__name__
except AttributeError:
return str(t)
s = ""
if type(self.type_) == list:
for i in range(len(self.type_)):
s += _type_str(self.type_[i])
if i < len(self.type_) - 2: s += ", "
elif i == len(self.type_) - 2: s += " or "
elif type(self.type_) == tuple:
for i in range(len(self.type_)):
s += _type_str(self.type_[i])
if i < len(self.type_) - 1: s += ", "
s = _type_str(self.type_)
return s
def __repr__(self):
s = ""
if self.descripton: s += f"{comment(self.descripton)}\n"
if self.type_: s += f"{comment('type: ' + self.type_str())}\n"
# if self.example: s += f"{comment('eg: ' + self.example)}\n"
if self.example: s += comment(f"{self.key} = {self.example}\n")
s += f"{self.key} = "
if self.default: s += f"{self.default}"
s += "\n"
return s
class CFG_File:
represents a cfg file
use the __repr__ method to export to a file
def __init__(self, header="", footer=""):
self.sections = [] # (name, desc, entries)
self.header = header
self.footer = footer
def add_section(self, name:str, entries: list[CFG_Entry|str], desc=""):
self.sections.append((name, desc, entries))
def __repr__(self):
s = comment(self.header) + "\n"
for name, desc, entries in self.sections:
if desc: s += f"\n{comment(desc)}"
s += f"\n[ {name} ]\n"
for entry in entries:
s += f"{entry}\n"
s += comment(self.footer)
return s
if __name__ == "__main__":
cfg = CFG_File(header=r"""
************************************* REGINA CONFIGURATION **************************************
_______ ____ ____ |__| ____ _____
\_ __ \_/ __ \ / ___\| |/ \\__ \
| | \/\ ___// /_/ > | | \/ __ \_
|__| \___ >___ /|__|___| (____ /
\/_____/ \/ \/
************************************************************************************************* """.strip(" \n"), footer=r"""
""".strip(" \n"))
cfg.add_section("regina", desc="Common Settings", entries=[
desc="name (not url) of the server or website\nwill be avaiable as variable for the generated html as %server_name",
desc="database path",
desc="path to the nginx access log to parse",
cfg.add_section("html-generation", desc="The template and generated file do actually have to be htmls, you can change it to whatever you want", entries=[
desc="template html input",
desc="output for the generated html",
desc="output directory for the generated plots",
typ_=Path(permissions="w", is_dir=True),
desc="nginx location for the generated images (this has to map to img_out_dir)",
typ_="eg: images",
cfg.add_section("data-collection", desc="These settings affect the data collection. If changed, they will affect how the database is being filled in the future.", entries=[
desc="whether a unique visitor is only identified by IP address",
desc="whether a visitor needs at least one successful request to be a human",
desc="whether a request with 30x HTTP status counts as successful request",
CFG_Entry("delete_ip_addresses", # TODO: Implement
desc="delete all ip addresses after the collection is done",
desc="don't collect requests to locations that match this regex",
typ_=[regexp, None],
desc="whether to get visitor location information",
CFG_Entry("do_geoip_rankings", # TODO: is used?
desc="whether to generate country and city rankings using GeoIP (requires GeoIP Database)",
desc="countries for which the GeoIP needs to be resolved at city level",
typ_="list of capitalized ISO 3166-1 alpha-2 country codes",
exam="AT, BE, BG, HR, CY, CZ, DK, EE, FI, FR, DE, GZ, HU, IE, IT, LV, LT, LU, MT, NL, PL, PT, RO, SK, SI, ES, SE"),
CFG_Entry("geoip_only_humans", # TODO: is used?
desc="whether to use only humans for GeoIP rankings (requires GeoIP Database)",
# cfg.add_section("data-visualization", desc="", entries=[
cfg.add_section("rankings", desc="", entries=[
Explanation for blacklists and whitelists:
If a blacklist is given: values that fully match the blacklist are excluded
If a whitelist is given: values that do not fully match the whitelist are excluded
Both are optional: you can provide, none or both
typ_=[regexp, None],
exam="City in .*"),
typ_=[regexp, None]),
typ_=[regexp, None]),
typ_=[regexp, None]),
typ_=[regexp, None],
typ_=[regexp, None],
desc="maximum number of entries in route ranking",
desc="whether to ignore non-existing routes in ranking",
# TODO add groups
# Entry("route_groups",
# desc="route groups for images",
# typ_=[regexp, None],
# exam="*.gif, *.jpeg, *.jpg, *.png, *.svg".replace(", ", "\n")),
typ_=[regexp, None],
exam="Example: exclude '-' (nginx sets this when there is no referer)"),
typ_=[regexp, None]),
desc="whether to ignore protocol in referer ranking (if True: == ->",
desc="whether to ignore subdomains inreferer ranking (if True: == ->",
desc="whether to ignore route in referer ranking (if True: == ->",
cfg.add_section("plots", desc="", entries=[
desc="DPI for plots",
dflt="14, 5",
desc="plot size for broad plots: width, heigh",
typ_=(int, int)),
dflt="7, 5",
desc="plot size for narrow plots: width, height",
typ_=(int, int)),
with open("generated-default.cfg", "w") as file:
def get_bool(bool_str: str, fallback=False):
if bool_str in ["true", "True"]: return True
@ -53,3 +348,72 @@ def read_settings_file(filepath: str, settings:dict, ignore_invalid_lines=True,
else: continue
settings[vals[0]] = vals[1].strip(" ")
class ReginaSettings:
def __init__(self, config_file):
parser = ConfigParser()
# with open(config_file, "r") as file
# default settings, these are overwriteable through a config file
self._settings = {
"server_name": "default_sever",
"access_log": "",
"db": "",
"locs_and_dirs": [],
"auto_group_filetypes": [],
"filegroups": "",
"request_location_blacklist": "",
"request_is_same_on_same_day": True, # mutiple requests from same visitor to same file at same day are counted as 1
"unique_visitor_is_ip_address": False,
"get_visitor_location": False,
"get_cities_for_countries": [""], # list if country codes for which the ip address ranges need to be collected at city level, not country level
"hash_ip_address": True,
"get_human_percentage": False,
"human_needs_success": True, # a human must have at least 1 successful request (status < 300)
"status_300_is_success": False, # 300 codes are success
"do_geoip_rankings": False,
"geoip_only_humans": True,
"city_ranking_blacklist": "",
"country_ranking_blacklist": "",
# "file_ranking_whitelist": r".*\.((txt)|(html)|(css)|(php)|(png)|(jpeg)|(jpg)|(svg)|(gif))",
"file_ranking_whitelist": r".*\.(html)",
"file_ranking_ignore_error_files": False, # skip files that only had unsuccessful requests (status < 300)
"referer_ranking_ignore_protocol": True,
"referer_ranking_ignore_subdomain": False,
"referer_ranking_ignore_location": True,
"referer_ranking_ignore_tld": False,
"referer_ranking_whitelist": r"^[^\-].*", # minus means empty
"visitor_agent_ranking_whitelist": r"",
"file_ranking_plot_max_files": 15,
# "plot_figsize": (60, 40),
"plot_dpi": 300,
"plot_add_count_label": True,
"plot_size_broad": (10, 5),
"plot_size_narrow": (6.5, 5),
"img_dir": "",
"img_location": "",
"img_filetype": "svg",
"template_html": "",
"html_out_path": "",
"last_x_days": 30,
# regina
"debug": False
def __getitem__(self, key):
return self._settings[key]
def __setitem__(self, key, value):
set key to value.
if key already exists, TypeError is raised if value is not of the same type as the current value
if key in self._settings.keys():
if type(value) != type(self._settings[key]):
raise TypeError(f"ReginaSettings: Trying to set value of '{key}' to '{value}' of type '{type(value)}', but the current type is '{type(self._settings[key])}'.")
self._settings[key] = value
@ -2,6 +2,7 @@
# print(f"{__file__}: __name__={__name__}, __package__={__package__}, sys.path[0]={path[0]}")
from sys import exit
from os import path
from re import fullmatch
from regina.utility.globals import settings
@ -9,6 +10,29 @@ from regina.utility.globals import settings
Various utitity
def is_whitelisted(val: str, whitelist: str|list[str]|None):
Check if val is in a regex whitelist
whitelist: regexp, list of regexp or None
if whitelist is None, always return True
if not whitelist: return True
if type(whitelist) == str:
return fullmatch(whitelist, val)
if type(whitelist) == list:
for w in whitelist:
if not fullmatch(w, val): return False
return True
def is_blacklisted(val: str, blacklist: str|list[str]|None):
Check if val is in a regex blacklist
blacklist: regexp, list of regexp or None
if blacklist is None, always return False
return not is_whitelisted(val, blacklist)
def pdebug(*args, **keys):
if settings["debug"]: print(*args, **keys)
Normal file
Normal file
Binary file not shown.
Reference in New Issue
Block a user