Source code for scripts.merge_entities

"""Merge the 8 ENTITIES_part_NN.json files into a consolidated ENTITIES.md.

Deduplicates entities by (name_en, name_fr) and aggregates source_acts.
Groups by category, sorts by frequency.
"""

from __future__ import annotations

import glob
import json
import os
from collections import defaultdict

CHUNKS_DIR = "scripts/chunks"
OUTPUT = "ENTITIES.md"

CATEGORY_LABELS = {
    "financial_institution": "Financial Institutions",
    "fund": "Funds & Collective Investment",
    "market_infrastructure": "Market Infrastructure",
    "investor": "Investors & Clients",
    "issuer": "Issuers & Listed Entities",
    "authority": "Authorities & Regulators",
    "service_provider": "Service Providers",
    "employee_role": "Roles & Personnel",
    "other": "Other",
}


[docs] def normalize(s: str | None) -> str: """Normalize entity labels for case-insensitive deduplication.""" return (s or "").strip().lower()
[docs] def make_key(entity: dict) -> str: """Dedup key: prefer EN name, fallback to FR.""" en = normalize(entity.get("name_en")) fr = normalize(entity.get("name_fr")) return en or fr
[docs] def main(): """Merge partial entity inventories into a consolidated Markdown report.""" files = sorted(glob.glob(os.path.join(CHUNKS_DIR, "ENTITIES_part_*.json"))) if not files: print(f"No ENTITIES_part_*.json files found in {CHUNKS_DIR}/") return print(f"Merging {len(files)} chunk files…") # key -> merged entity merged: dict[str, dict] = {} for path in files: with open(path, encoding="utf-8") as f: data = json.load(f) for ent in data.get("entities", []): key = make_key(ent) if not key: continue if key not in merged: merged[key] = { "name_en": ent.get("name_en"), "name_fr": ent.get("name_fr"), "definition": ent.get("definition", ""), "category": ent.get("category", "other"), "source_acts": set(ent.get("source_acts", [])), } else: m = merged[key] # Prefer non-null names if not m["name_en"] and ent.get("name_en"): m["name_en"] = ent["name_en"] if not m["name_fr"] and ent.get("name_fr"): m["name_fr"] = ent["name_fr"] # Prefer longer definition new_def = ent.get("definition", "") if len(new_def) > len(m["definition"]): m["definition"] = new_def m["source_acts"].update(ent.get("source_acts", [])) # Group by category, sort by frequency (source_acts count) by_cat: dict[str, list[dict]] = defaultdict(list) for ent in merged.values(): ent["occurrences"] = len(ent["source_acts"]) ent["source_acts"] = sorted(ent["source_acts"]) by_cat[ent["category"]].append(ent) for cat, lst in by_cat.items(): lst.sort(key=lambda e: -e["occurrences"]) # Write markdown with open(OUTPUT, "w", encoding="utf-8") as f: f.write("# Entity Types — EUR-Lex Scope Analysis\n\n") f.write(f"Consolidated from {len(files)} sub-agent outputs. Total unique entity types: **{len(merged)}**\n\n") f.write( "Sorted by category, then by frequency (number of distinct " "EUR-Lex acts where the entity type appears).\n\n---\n\n" ) # Order categories by total occurrences cat_order = sorted( by_cat.keys(), key=lambda c: -sum(e["occurrences"] for e in by_cat[c]), ) for cat in cat_order: label = CATEGORY_LABELS.get(cat, cat.title()) entities = by_cat[cat] f.write(f"## {label} ({len(entities)})\n\n") f.write("| # | EN | FR | Definition | Occ. | Example acts |\n") f.write("|---|----|----|-----------|------|--------------|\n") for i, ent in enumerate(entities, 1): en = ent["name_en"] or "—" fr = ent["name_fr"] or "—" defn = (ent["definition"] or "—").replace("\n", " ").replace("|", "\\|") if len(defn) > 150: defn = defn[:150] + "…" occ = ent["occurrences"] examples = ", ".join(ent["source_acts"][:3]) f.write(f"| {i} | {en} | {fr} | {defn} | {occ} | {examples} |\n") f.write("\n") print(f"Wrote {OUTPUT} with {len(merged)} unique entity types across {len(by_cat)} categories.")
if __name__ == "__main__": main()