#!/usr/bin/env python3 """ Convert HAR files to readable CSV files. Output 1: har_entries.csv One row per entry in log.entries. This is the most direct way to inspect the HAR structure: each { ... } inside entries[] becomes one CSV row. Output 2: har_summary.csv One row per HAR file with simple totals. The script does not remove cookie values or URLs. Treat the output as sensitive. """ from __future__ import annotations import argparse import csv import json from pathlib import Path from urllib.parse import parse_qs, urlparse ENTRY_FIELDS = [ "har_filename", "search_engine", "entry_index", "startedDateTime", "time_ms", "method", "url", "domain", "path", "query_text", "status", "statusText", "request_cookie_count", "request_cookie_names", "request_cookie_values", "response_cookie_count", "response_cookie_names", "response_cookie_values", "query_param_count", "query_param_names", "query_param_values", "request_header_count", "response_header_count", "post_data_present", "request_body_size", "response_body_size", "response_content_size", "transferred_bytes_approx", "is_third_party_domain", "tracking_hint", ] SUMMARY_FIELDS = [ "har_filename", "search_engine", "query_text", "requests_total", "unique_domains", "third_party_requests", "request_cookies_total", "response_cookies_total", "query_params_total", "post_requests_total", "tracking_hint_requests", "transferred_kb_approx", "page_load_ms", "status_2xx", "status_3xx", "status_4xx", "status_5xx", ] TRACKING_WORDS = [ "ads", "adservice", "analytics", "collect", "conversion", "doubleclick", "event", "gen_204", "googleadservices", "improving", "log", "metrics", "pagead", "telemetry", "track", ] def detect_search_engine(har_path: Path) -> str: name = har_path.name.lower() if "duckduckgo" in name: return "DuckDuckGo" if "google" in name: return "Google" return "Unknown" def read_har(path: Path) -> dict: with path.open(encoding="utf-8", errors="replace") as file: return json.load(file) def entries_from_har(har_data: dict) -> list[dict]: return har_data.get("log", {}).get("entries", []) or [] def pages_from_har(har_data: dict) -> list[dict]: return har_data.get("log", {}).get("pages", []) or [] def cookie_names(cookies: list[dict]) -> str: return "|".join(cookie.get("name", "") for cookie in cookies) def cookie_values(cookies: list[dict]) -> str: return "|".join(cookie.get("value", "") for cookie in cookies) def query_names(query_items: list[dict]) -> str: return "|".join(item.get("name", "") for item in query_items) def query_values(query_items: list[dict]) -> str: return "|".join(item.get("value", "") for item in query_items) def positive_number(value: object) -> int: if isinstance(value, (int, float)) and value > 0: return int(value) return 0 def approximate_transferred_bytes(entry: dict) -> int: request = entry.get("request", {}) or {} response = entry.get("response", {}) or {} content = response.get("content", {}) or {} return ( positive_number(request.get("headersSize")) + positive_number(request.get("bodySize")) + positive_number(response.get("headersSize")) + positive_number(response.get("bodySize")) + positive_number(content.get("size")) ) def extract_query_text_from_url(url: str) -> str: parsed = urlparse(url) query = parse_qs(parsed.query, keep_blank_values=True) values = query.get("q", []) return values[0] if values else "" def has_tracking_hint(domain: str, path: str, url: str) -> str: text = f"{domain} {path} {url}".lower() return "yes" if any(word in text for word in TRACKING_WORDS) else "no" def max_page_load_ms(entries: list[dict], pages: list[dict]) -> float: max_time = 0.0 for page in pages: on_load = (page.get("pageTimings", {}) or {}).get("onLoad", -1) if isinstance(on_load, (int, float)) and on_load > max_time: max_time = float(on_load) for entry in entries: entry_time = entry.get("time", -1) if isinstance(entry_time, (int, float)) and entry_time > max_time: max_time = float(entry_time) return max_time def main_domain_for_engine(search_engine: str) -> str: if search_engine == "Google": return "google." if search_engine == "DuckDuckGo": return "duckduckgo.com" return "" def make_entry_rows(har_path: Path) -> list[dict]: har_data = read_har(har_path) entries = entries_from_har(har_data) search_engine = detect_search_engine(har_path) main_domain = main_domain_for_engine(search_engine) rows = [] for index, entry in enumerate(entries, start=1): request = entry.get("request", {}) or {} response = entry.get("response", {}) or {} content = response.get("content", {}) or {} url = request.get("url", "") parsed = urlparse(url) request_cookies = request.get("cookies", []) or [] response_cookies = response.get("cookies", []) or [] query_items = request.get("queryString", []) or [] domain = parsed.netloc.lower() path = parsed.path query_text = extract_query_text_from_url(url) third_party = "no" if main_domain and domain and main_domain not in domain: third_party = "yes" rows.append( { "har_filename": har_path.name, "search_engine": search_engine, "entry_index": index, "startedDateTime": entry.get("startedDateTime", ""), "time_ms": entry.get("time", ""), "method": request.get("method", ""), "url": url, "domain": domain, "path": path, "query_text": query_text, "status": response.get("status", ""), "statusText": response.get("statusText", ""), "request_cookie_count": len(request_cookies), "request_cookie_names": cookie_names(request_cookies), "request_cookie_values": cookie_values(request_cookies), "response_cookie_count": len(response_cookies), "response_cookie_names": cookie_names(response_cookies), "response_cookie_values": cookie_values(response_cookies), "query_param_count": len(query_items), "query_param_names": query_names(query_items), "query_param_values": query_values(query_items), "request_header_count": len(request.get("headers", []) or []), "response_header_count": len(response.get("headers", []) or []), "post_data_present": "yes" if request.get("postData") else "no", "request_body_size": request.get("bodySize", ""), "response_body_size": response.get("bodySize", ""), "response_content_size": content.get("size", ""), "transferred_bytes_approx": approximate_transferred_bytes(entry), "is_third_party_domain": third_party, "tracking_hint": has_tracking_hint(domain, path, url), } ) return rows def make_summary_row(har_path: Path, entry_rows: list[dict]) -> dict: har_data = read_har(har_path) entries = entries_from_har(har_data) pages = pages_from_har(har_data) domains = {row["domain"] for row in entry_rows if row["domain"]} status_counts = {2: 0, 3: 0, 4: 0, 5: 0} query_text = "" for row in entry_rows: if row["query_text"] and not query_text: query_text = row["query_text"] status = row["status"] if isinstance(status, int): group = status // 100 if group in status_counts: status_counts[group] += 1 transferred_bytes = sum(int(row["transferred_bytes_approx"]) for row in entry_rows) return { "har_filename": har_path.name, "search_engine": detect_search_engine(har_path), "query_text": query_text, "requests_total": len(entry_rows), "unique_domains": len(domains), "third_party_requests": sum( 1 for row in entry_rows if row["is_third_party_domain"] == "yes" ), "request_cookies_total": sum(int(row["request_cookie_count"]) for row in entry_rows), "response_cookies_total": sum( int(row["response_cookie_count"]) for row in entry_rows ), "query_params_total": sum(int(row["query_param_count"]) for row in entry_rows), "post_requests_total": sum(1 for row in entry_rows if row["method"] == "POST"), "tracking_hint_requests": sum(1 for row in entry_rows if row["tracking_hint"] == "yes"), "transferred_kb_approx": round(transferred_bytes / 1024, 2), "page_load_ms": round(max_page_load_ms(entries, pages), 2), "status_2xx": status_counts[2], "status_3xx": status_counts[3], "status_4xx": status_counts[4], "status_5xx": status_counts[5], } def write_csv(path: Path, fieldnames: list[str], rows: list[dict]) -> None: with path.open("w", newline="", encoding="utf-8") as file: writer = csv.DictWriter(file, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Convert HAR files to readable CSV files.") parser.add_argument( "--input-dir", type=Path, default=Path("data"), help="Folder with .har files. Default: data", ) parser.add_argument( "--entries-output", type=Path, default=Path("har_entries.csv"), help="CSV with one row per log.entries item. Default: har_entries.csv", ) parser.add_argument( "--summary-output", type=Path, default=Path("har_summary.csv"), help="CSV with one row per HAR file. Default: har_summary.csv", ) return parser.parse_args() def main() -> None: args = parse_args() har_files = sorted(args.input_dir.glob("*.har")) if not har_files: raise SystemExit(f"No HAR files found in {args.input_dir}") all_entry_rows = [] summary_rows = [] for har_path in har_files: entry_rows = make_entry_rows(har_path) all_entry_rows.extend(entry_rows) summary_rows.append(make_summary_row(har_path, entry_rows)) write_csv(args.entries_output, ENTRY_FIELDS, all_entry_rows) write_csv(args.summary_output, SUMMARY_FIELDS, summary_rows) print(f"Wrote {len(all_entry_rows)} entry rows to {args.entries_output}") print(f"Wrote {len(summary_rows)} summary rows to {args.summary_output}") if __name__ == "__main__": main()