This commit is contained in:
2026-05-20 08:41:59 +02:00
parent 4b2e1455c9
commit e356879542
12 changed files with 816 additions and 14 deletions

View File

@@ -0,0 +1,352 @@
#!/usr/bin/env python3
"""
Convert HAR files to readable CSV files.
Output 1: har_entries.csv
One row per entry in log.entries. This is the most direct way to inspect
the HAR structure: each { ... } inside entries[] becomes one CSV row.
Output 2: har_summary.csv
One row per HAR file with simple totals.
The script does not remove cookie values or URLs. Treat the output as sensitive.
"""
from __future__ import annotations
import argparse
import csv
import json
from pathlib import Path
from urllib.parse import parse_qs, urlparse
ENTRY_FIELDS = [
"har_filename",
"search_engine",
"entry_index",
"startedDateTime",
"time_ms",
"method",
"url",
"domain",
"path",
"query_text",
"status",
"statusText",
"request_cookie_count",
"request_cookie_names",
"request_cookie_values",
"response_cookie_count",
"response_cookie_names",
"response_cookie_values",
"query_param_count",
"query_param_names",
"query_param_values",
"request_header_count",
"response_header_count",
"post_data_present",
"request_body_size",
"response_body_size",
"response_content_size",
"transferred_bytes_approx",
"is_third_party_domain",
"tracking_hint",
]
SUMMARY_FIELDS = [
"har_filename",
"search_engine",
"query_text",
"requests_total",
"unique_domains",
"third_party_requests",
"request_cookies_total",
"response_cookies_total",
"query_params_total",
"post_requests_total",
"tracking_hint_requests",
"transferred_kb_approx",
"page_load_ms",
"status_2xx",
"status_3xx",
"status_4xx",
"status_5xx",
]
TRACKING_WORDS = [
"ads",
"adservice",
"analytics",
"collect",
"conversion",
"doubleclick",
"event",
"gen_204",
"googleadservices",
"improving",
"log",
"metrics",
"pagead",
"telemetry",
"track",
]
def detect_search_engine(har_path: Path) -> str:
name = har_path.name.lower()
if "duckduckgo" in name:
return "DuckDuckGo"
if "google" in name:
return "Google"
return "Unknown"
def read_har(path: Path) -> dict:
with path.open(encoding="utf-8", errors="replace") as file:
return json.load(file)
def entries_from_har(har_data: dict) -> list[dict]:
return har_data.get("log", {}).get("entries", []) or []
def pages_from_har(har_data: dict) -> list[dict]:
return har_data.get("log", {}).get("pages", []) or []
def cookie_names(cookies: list[dict]) -> str:
return "|".join(cookie.get("name", "") for cookie in cookies)
def cookie_values(cookies: list[dict]) -> str:
return "|".join(cookie.get("value", "") for cookie in cookies)
def query_names(query_items: list[dict]) -> str:
return "|".join(item.get("name", "") for item in query_items)
def query_values(query_items: list[dict]) -> str:
return "|".join(item.get("value", "") for item in query_items)
def positive_number(value: object) -> int:
if isinstance(value, (int, float)) and value > 0:
return int(value)
return 0
def approximate_transferred_bytes(entry: dict) -> int:
request = entry.get("request", {}) or {}
response = entry.get("response", {}) or {}
content = response.get("content", {}) or {}
return (
positive_number(request.get("headersSize"))
+ positive_number(request.get("bodySize"))
+ positive_number(response.get("headersSize"))
+ positive_number(response.get("bodySize"))
+ positive_number(content.get("size"))
)
def extract_query_text_from_url(url: str) -> str:
parsed = urlparse(url)
query = parse_qs(parsed.query, keep_blank_values=True)
values = query.get("q", [])
return values[0] if values else ""
def has_tracking_hint(domain: str, path: str, url: str) -> str:
text = f"{domain} {path} {url}".lower()
return "yes" if any(word in text for word in TRACKING_WORDS) else "no"
def max_page_load_ms(entries: list[dict], pages: list[dict]) -> float:
max_time = 0.0
for page in pages:
on_load = (page.get("pageTimings", {}) or {}).get("onLoad", -1)
if isinstance(on_load, (int, float)) and on_load > max_time:
max_time = float(on_load)
for entry in entries:
entry_time = entry.get("time", -1)
if isinstance(entry_time, (int, float)) and entry_time > max_time:
max_time = float(entry_time)
return max_time
def main_domain_for_engine(search_engine: str) -> str:
if search_engine == "Google":
return "google."
if search_engine == "DuckDuckGo":
return "duckduckgo.com"
return ""
def make_entry_rows(har_path: Path) -> list[dict]:
har_data = read_har(har_path)
entries = entries_from_har(har_data)
search_engine = detect_search_engine(har_path)
main_domain = main_domain_for_engine(search_engine)
rows = []
for index, entry in enumerate(entries, start=1):
request = entry.get("request", {}) or {}
response = entry.get("response", {}) or {}
content = response.get("content", {}) or {}
url = request.get("url", "")
parsed = urlparse(url)
request_cookies = request.get("cookies", []) or []
response_cookies = response.get("cookies", []) or []
query_items = request.get("queryString", []) or []
domain = parsed.netloc.lower()
path = parsed.path
query_text = extract_query_text_from_url(url)
third_party = "no"
if main_domain and domain and main_domain not in domain:
third_party = "yes"
rows.append(
{
"har_filename": har_path.name,
"search_engine": search_engine,
"entry_index": index,
"startedDateTime": entry.get("startedDateTime", ""),
"time_ms": entry.get("time", ""),
"method": request.get("method", ""),
"url": url,
"domain": domain,
"path": path,
"query_text": query_text,
"status": response.get("status", ""),
"statusText": response.get("statusText", ""),
"request_cookie_count": len(request_cookies),
"request_cookie_names": cookie_names(request_cookies),
"request_cookie_values": cookie_values(request_cookies),
"response_cookie_count": len(response_cookies),
"response_cookie_names": cookie_names(response_cookies),
"response_cookie_values": cookie_values(response_cookies),
"query_param_count": len(query_items),
"query_param_names": query_names(query_items),
"query_param_values": query_values(query_items),
"request_header_count": len(request.get("headers", []) or []),
"response_header_count": len(response.get("headers", []) or []),
"post_data_present": "yes" if request.get("postData") else "no",
"request_body_size": request.get("bodySize", ""),
"response_body_size": response.get("bodySize", ""),
"response_content_size": content.get("size", ""),
"transferred_bytes_approx": approximate_transferred_bytes(entry),
"is_third_party_domain": third_party,
"tracking_hint": has_tracking_hint(domain, path, url),
}
)
return rows
def make_summary_row(har_path: Path, entry_rows: list[dict]) -> dict:
har_data = read_har(har_path)
entries = entries_from_har(har_data)
pages = pages_from_har(har_data)
domains = {row["domain"] for row in entry_rows if row["domain"]}
status_counts = {2: 0, 3: 0, 4: 0, 5: 0}
query_text = ""
for row in entry_rows:
if row["query_text"] and not query_text:
query_text = row["query_text"]
status = row["status"]
if isinstance(status, int):
group = status // 100
if group in status_counts:
status_counts[group] += 1
transferred_bytes = sum(int(row["transferred_bytes_approx"]) for row in entry_rows)
return {
"har_filename": har_path.name,
"search_engine": detect_search_engine(har_path),
"query_text": query_text,
"requests_total": len(entry_rows),
"unique_domains": len(domains),
"third_party_requests": sum(
1 for row in entry_rows if row["is_third_party_domain"] == "yes"
),
"request_cookies_total": sum(int(row["request_cookie_count"]) for row in entry_rows),
"response_cookies_total": sum(
int(row["response_cookie_count"]) for row in entry_rows
),
"query_params_total": sum(int(row["query_param_count"]) for row in entry_rows),
"post_requests_total": sum(1 for row in entry_rows if row["method"] == "POST"),
"tracking_hint_requests": sum(1 for row in entry_rows if row["tracking_hint"] == "yes"),
"transferred_kb_approx": round(transferred_bytes / 1024, 2),
"page_load_ms": round(max_page_load_ms(entries, pages), 2),
"status_2xx": status_counts[2],
"status_3xx": status_counts[3],
"status_4xx": status_counts[4],
"status_5xx": status_counts[5],
}
def write_csv(path: Path, fieldnames: list[str], rows: list[dict]) -> None:
with path.open("w", newline="", encoding="utf-8") as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Convert HAR files to readable CSV files.")
parser.add_argument(
"--input-dir",
type=Path,
default=Path("data"),
help="Folder with .har files. Default: data",
)
parser.add_argument(
"--entries-output",
type=Path,
default=Path("har_entries.csv"),
help="CSV with one row per log.entries item. Default: har_entries.csv",
)
parser.add_argument(
"--summary-output",
type=Path,
default=Path("har_summary.csv"),
help="CSV with one row per HAR file. Default: har_summary.csv",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
har_files = sorted(args.input_dir.glob("*.har"))
if not har_files:
raise SystemExit(f"No HAR files found in {args.input_dir}")
all_entry_rows = []
summary_rows = []
for har_path in har_files:
entry_rows = make_entry_rows(har_path)
all_entry_rows.extend(entry_rows)
summary_rows.append(make_summary_row(har_path, entry_rows))
write_csv(args.entries_output, ENTRY_FIELDS, all_entry_rows)
write_csv(args.summary_output, SUMMARY_FIELDS, summary_rows)
print(f"Wrote {len(all_entry_rows)} entry rows to {args.entries_output}")
print(f"Wrote {len(summary_rows)} summary rows to {args.summary_output}")
if __name__ == "__main__":
main()